Merge pull request #39 from filecoin-project/next

Next Staging Branch
This commit is contained in:
Łukasz Magiera 2020-08-06 01:21:00 +02:00 committed by GitHub
commit be1e7d33d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 633 additions and 115 deletions

View File

@ -7,6 +7,7 @@ import (
"io"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
@ -131,7 +132,7 @@ func (t *DealInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{162}); err != nil {
if _, err := w.Write([]byte{163}); err != nil {
return err
}
@ -166,6 +167,22 @@ func (t *DealInfo) MarshalCBOR(w io.Writer) error {
if err := t.DealSchedule.MarshalCBOR(w); err != nil {
return err
}
// t.KeepUnsealed (bool) (bool)
if len("KeepUnsealed") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"KeepUnsealed\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("KeepUnsealed")))); err != nil {
return err
}
if _, err := w.Write([]byte("KeepUnsealed")); err != nil {
return err
}
if err := cbg.WriteBool(w, t.KeepUnsealed); err != nil {
return err
}
return nil
}
@ -224,6 +241,24 @@ func (t *DealInfo) UnmarshalCBOR(r io.Reader) error {
}
}
// t.KeepUnsealed (bool) (bool)
case "KeepUnsealed":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajOther {
return fmt.Errorf("booleans must be major type 7")
}
switch extra {
case 20:
t.KeepUnsealed = false
case 21:
t.KeepUnsealed = true
default:
return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra)
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
@ -382,7 +417,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{180}); err != nil {
if _, err := w.Write([]byte{182}); err != nil {
return err
}
@ -607,6 +642,38 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
return err
}
// t.PreCommitInfo (miner.SectorPreCommitInfo) (struct)
if len("PreCommitInfo") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"PreCommitInfo\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("PreCommitInfo")))); err != nil {
return err
}
if _, err := w.Write([]byte("PreCommitInfo")); err != nil {
return err
}
if err := t.PreCommitInfo.MarshalCBOR(w); err != nil {
return err
}
// t.PreCommitDeposit (big.Int) (struct)
if len("PreCommitDeposit") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"PreCommitDeposit\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("PreCommitDeposit")))); err != nil {
return err
}
if _, err := w.Write([]byte("PreCommitDeposit")); err != nil {
return err
}
if err := t.PreCommitDeposit.MarshalCBOR(w); err != nil {
return err
}
// t.PreCommitMessage (cid.Cid) (struct)
if len("PreCommitMessage") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"PreCommitMessage\" was too long")
@ -1065,6 +1132,38 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
if _, err := io.ReadFull(br, t.Proof); err != nil {
return err
}
// t.PreCommitInfo (miner.SectorPreCommitInfo) (struct)
case "PreCommitInfo":
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
t.PreCommitInfo = new(miner.SectorPreCommitInfo)
if err := t.PreCommitInfo.UnmarshalCBOR(br); err != nil {
return xerrors.Errorf("unmarshaling t.PreCommitInfo pointer: %w", err)
}
}
}
// t.PreCommitDeposit (big.Int) (struct)
case "PreCommitDeposit":
{
if err := t.PreCommitDeposit.UnmarshalCBOR(br); err != nil {
return xerrors.Errorf("unmarshaling t.PreCommitDeposit: %w", err)
}
}
// t.PreCommitMessage (cid.Cid) (struct)
case "PreCommitMessage":
@ -1251,7 +1350,7 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
return err
}
if extra > cbg.MaxLength {
if extra > cbg.MaxLength+1 { // +1 placed here to recover broken state machines in calibration net; feel free to drop
return fmt.Errorf("t.Log: array too large (%d)", extra)
}

View File

@ -50,7 +50,7 @@ func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error {
proposal, err := api.StateMarketStorageDeal(ctx, p.DealInfo.DealID, tok)
if err != nil {
return &ErrApi{xerrors.Errorf("getting deal %d for piece %d: %w", p.DealInfo.DealID, i, err)}
return &ErrInvalidDeals{xerrors.Errorf("getting deal %d for piece %d: %w", p.DealInfo.DealID, i, err)}
}
if proposal.PieceCID != p.Piece.PieceCID {
@ -92,9 +92,9 @@ func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, t
if pci != nil {
if pci.Info.SealRandEpoch != si.TicketEpoch {
return &ErrBadTicket{}
return &ErrBadTicket{xerrors.Errorf("bad ticket epoch: %d != %d", pci.Info.SealRandEpoch, si.TicketEpoch)}
}
return &ErrPrecommitOnChain{}
return &ErrPrecommitOnChain{xerrors.Errorf("precommit already on chain")}
}
return nil

View File

@ -6,7 +6,7 @@ import (
)
// Epochs
const SealRandomnessLookback = miner.ChainFinalityish
const SealRandomnessLookback = miner.ChainFinality
// Epochs
func SealRandomnessLookbackLimit(spt abi.RegisteredSealProof) abi.ChainEpoch {

116
fsm.go
View File

@ -34,8 +34,16 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
// Sealing
UndefinedSectorState: planOne(on(SectorStart{}, Packing)),
Packing: planOne(on(SectorPacked{}, PreCommit1)),
UndefinedSectorState: planOne(
on(SectorStart{}, Empty),
on(SectorStartCC{}, Packing),
),
Empty: planOne(on(SectorAddPiece{}, WaitDeals)),
WaitDeals: planOne(
on(SectorAddPiece{}, WaitDeals),
on(SectorStartPacking{}, Packing),
),
Packing: planOne(on(SectorPacked{}, PreCommit1)),
PreCommit1: planOne(
on(SectorPreCommit1{}, PreCommit2),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
@ -96,6 +104,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryComputeProof{}, Committing),
on(SectorRetryInvalidProof{}, Committing),
on(SectorRetryPreCommitWait{}, PreCommitWait),
on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorRetryPreCommit{}, PreCommitting),
),
FinalizeFailed: planOne(
on(SectorRetryFinalize{}, FinalizeSector),
@ -141,6 +151,17 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
l.Trace = fmt.Sprintf("%+v", err)
}
if len(state.Log) > 8000 {
log.Warnw("truncating sector log", "sector", state.SectorNumber)
state.Log[2000] = Log{
Timestamp: uint64(time.Now().Unix()),
Message: "truncating log (above 8000 entries)",
Kind: fmt.Sprintf("truncate"),
}
state.Log = append(state.Log[:2000], state.Log[:6000]...)
}
state.Log = append(state.Log, l)
}
@ -158,49 +179,56 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
/*
* Empty
| |
| v
*<- Packing <- incoming
| |
| v
*<- PreCommit1 <--> SealPreCommit1Failed
| | ^ ^^
| | *----------++----\
| v v || |
*<- PreCommit2 --------++--> SealPreCommit2Failed
| | ||
| v /-------/|
* PreCommitting <-----+---> PreCommitFailed
| | | ^
| v | |
*<- WaitSeed -----------+-----/
| ||| ^ |
| ||| \--------*-----/
| ||| |
| vvv v----+----> ComputeProofFailed
*<- Committing |
| | ^--> CommitFailed
| v ^
*<- CommitWait ---/
| |
| v
| FinalizeSector <--> FinalizeFailed
| |
| v
*<- Proving
|
v
FailedUnrecoverable
* Empty <- incoming deals
| |
| v
*<- WaitDeals <- incoming deals
| |
| v
*<- Packing <- incoming committed capacity
| |
| v
*<- PreCommit1 <--> SealPreCommit1Failed
| | ^ ^^
| | *----------++----\
| v v || |
*<- PreCommit2 --------++--> SealPreCommit2Failed
| | ||
| v /-------/|
* PreCommitting <-----+---> PreCommitFailed
| | | ^
| v | |
*<- WaitSeed -----------+-----/
| ||| ^ |
| ||| \--------*-----/
| ||| |
| vvv v----+----> ComputeProofFailed
*<- Committing |
| | ^--> CommitFailed
| v ^
*<- CommitWait ---/
| |
| v
| FinalizeSector <--> FinalizeFailed
| |
| v
*<- Proving
|
v
FailedUnrecoverable
UndefinedSectorState <- ¯\_()_/¯
| ^
*---------------------/
UndefinedSectorState <- ¯\_()_/¯
| ^
*---------------------/
*/
switch state.State {
// Happy path
case Empty:
fallthrough
case WaitDeals:
log.Infof("Waiting for deals %d", state.SectorNumber)
case Packing:
return m.handlePacking, nil
case PreCommit1:
@ -236,10 +264,11 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// Post-seal
case Proving:
// TODO: track sector health / expiration
log.Infof("Proving sector %d", state.SectorNumber)
return m.handleProvingSector, nil
case Removing:
return m.handleRemoving, nil
case Removed:
return nil, nil
// Faults
case Faulty:
@ -355,6 +384,11 @@ func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statema
return nil
}
_, ok := events[0].User.(Ignorable)
if ok {
return nil
}
return xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
}
}

View File

@ -1,10 +1,13 @@
package sealing
import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-storage/storage"
)
type mutator interface {
@ -18,6 +21,10 @@ type globalMutator interface {
applyGlobal(state *SectorInfo) bool
}
type Ignorable interface {
Ignore()
}
// Global events
type SectorRestart struct{}
@ -50,15 +57,39 @@ func (evt SectorForceState) applyGlobal(state *SectorInfo) bool {
type SectorStart struct {
ID abi.SectorNumber
SectorType abi.RegisteredSealProof
Pieces []Piece
}
func (evt SectorStart) apply(state *SectorInfo) {
state.SectorNumber = evt.ID
state.SectorType = evt.SectorType
}
type SectorStartCC struct {
ID abi.SectorNumber
SectorType abi.RegisteredSealProof
Pieces []Piece
}
func (evt SectorStartCC) apply(state *SectorInfo) {
state.SectorNumber = evt.ID
state.Pieces = evt.Pieces
state.SectorType = evt.SectorType
}
type SectorAddPiece struct {
NewPiece Piece
}
func (evt SectorAddPiece) apply(state *SectorInfo) {
state.Pieces = append(state.Pieces, evt.NewPiece)
}
type SectorStartPacking struct{}
func (evt SectorStartPacking) apply(*SectorInfo) {}
func (evt SectorStartPacking) Ignore() {}
type SectorPacked struct{ FillerPieces []abi.PieceInfo }
func (evt SectorPacked) apply(state *SectorInfo) {
@ -129,11 +160,15 @@ func (evt SectorChainPreCommitFailed) FormatError(xerrors.Printer) (next error)
func (evt SectorChainPreCommitFailed) apply(*SectorInfo) {}
type SectorPreCommitted struct {
Message cid.Cid
Message cid.Cid
PreCommitDeposit big.Int
PreCommitInfo miner.SectorPreCommitInfo
}
func (evt SectorPreCommitted) apply(state *SectorInfo) {
state.PreCommitMessage = &evt.Message
state.PreCommitDeposit = evt.PreCommitDeposit
state.PreCommitInfo = &evt.PreCommitInfo
}
type SectorSeedReady struct {

View File

@ -6,7 +6,6 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/actors/abi"
nr "github.com/filecoin-project/storage-fsm/lib/nullreader"
@ -46,12 +45,6 @@ func (m *Sealing) PledgeSector() error {
size := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded()
rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize())
if err != nil {
log.Error(err)
return
}
sid, err := m.sc.Next()
if err != nil {
log.Errorf("%+v", err)
@ -77,7 +70,7 @@ func (m *Sealing) PledgeSector() error {
}
}
if err := m.newSector(sid, rt, ps); err != nil {
if err := m.newSectorCC(sid, ps); err != nil {
log.Errorf("%+v", err)
return
}

8
go.mod
View File

@ -9,16 +9,16 @@ require (
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 // indirect
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9
github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246
github.com/filecoin-project/specs-actors v0.6.0
github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15
github.com/filecoin-project/specs-actors v0.7.3-0.20200716231407-60a2ae96d2e6
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-hamt-ipld v0.0.15-0.20200204200533-99b8553ef242 // indirect
github.com/ipfs/go-ipld-cbor v0.0.5-0.20200204214505-252690b78669 // indirect
github.com/ipfs/go-log/v2 v2.0.3
github.com/stretchr/testify v1.4.0
github.com/whyrusleeping/cbor-gen v0.0.0-20200414195334-429a0b5e922e
github.com/whyrusleeping/cbor-gen v0.0.0-20200710004633-5379fc63235d
go.uber.org/zap v1.14.1 // indirect
golang.org/x/crypto v0.0.0-20200317142112-1b76d66859c6 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect

21
go.sum
View File

@ -38,8 +38,9 @@ github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060 h1:/3
github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mod h1:iodsLxOFZnqKtjj2zkgqzoGNrv6vUqj69AT/J8DKXEw=
github.com/filecoin-project/go-bitfield v0.0.1 h1:Xg/JnrqqE77aJVKdbEyR04n9FZQWhwrN+buDgQCVpZU=
github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-bitfield v0.0.3/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1 h1:xuHlrdznafh7ul5t4xEncnA4qgpQvJZEw+mr98eqHXw=
github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
@ -55,13 +56,16 @@ github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246 h1:NfYQRmVRe0LzlNbK5Ket3vbBOwFD5TvtcNtfo/Sd8mg=
github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246/go.mod h1:8f0hWDzzIi1hKs4IVKH9RnDsO4LEHVz8BNat0okDOuY=
github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15 h1:miw6hiusb/MkV1ryoqUKKWnvHhPW00AYtyeCj0L8pqo=
github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15/go.mod h1:salgVdX7qeXFo/xaiEQE29J4pPkjn71T0kt0n+VDBzo=
github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA=
github.com/filecoin-project/specs-actors v0.3.0 h1:QxgAuTrZr5TPqjyprZk0nTYW5o0JWpzbb5v+4UHHvN0=
github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=
github.com/filecoin-project/specs-actors v0.6.0 h1:IepUsmDGY60QliENVTkBTAkwqGWw9kNbbHOcU/9oiC0=
github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
github.com/filecoin-project/specs-actors v0.6.1/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
github.com/filecoin-project/specs-actors v0.7.3-0.20200716231407-60a2ae96d2e6 h1:F+GcBdKPdW/wTv6bMJxG9Zj1dc0UGkO6uNOQmKP/g1o=
github.com/filecoin-project/specs-actors v0.7.3-0.20200716231407-60a2ae96d2e6/go.mod h1:JOMUa7EijvpOO4ofD1yeHNmqohkmmnhTvz/IpB6so4c=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@ -102,6 +106,8 @@ github.com/ipfs/go-cid v0.0.4-0.20191112011718-79e75dffeb10/go.mod h1:/BYOuUoxkE
github.com/ipfs/go-cid v0.0.4/go.mod h1:4LLaPOQwmk5z9LBgQnpkivrx8BJjUyGwTXCd5Xfj6+M=
github.com/ipfs/go-cid v0.0.5 h1:o0Ix8e/ql7Zb5UVUJEUfjsWCIY8t48++9lR8qi6oiJU=
github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog=
github.com/ipfs/go-cid v0.0.6 h1:go0y+GcDOGeJIV01FeBsta4FHngoA4Wz7KMeLkXAhMs=
github.com/ipfs/go-cid v0.0.6/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
github.com/ipfs/go-datastore v0.4.4 h1:rjvQ9+muFaJ+QZ7dN5B1MSDNQ0JVZKkkES/rMZmA8X8=
github.com/ipfs/go-datastore v0.4.4/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
@ -185,9 +191,13 @@ github.com/mr-tron/base58 v1.1.3 h1:v+sk57XuaCKGXpWtVBX8YJzO7hMGx4Aajh4TQbdEFdc=
github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4=
github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM=
github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4=
github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmrJR+ubhT9qA=
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.0.9/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
@ -243,8 +253,11 @@ github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:X
github.com/whyrusleeping/cbor-gen v0.0.0-20200206220010-03c9665e2a66/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI=
github.com/whyrusleeping/cbor-gen v0.0.0-20200414195334-429a0b5e922e h1:JY8o/ebUUrCYetWmjRCNghxC59cOEaili83rxPRQCLw=
github.com/whyrusleeping/cbor-gen v0.0.0-20200414195334-429a0b5e922e/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI=
github.com/whyrusleeping/cbor-gen v0.0.0-20200710004633-5379fc63235d h1:wSxKhvbN7kUoP0sfRS+w2tWr45qlU8409i94hHLOT8w=
github.com/whyrusleeping/cbor-gen v0.0.0-20200710004633-5379fc63235d/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xorcare/golden v0.6.0/go.mod h1:7T39/ZMvaSEZlBPoYfVFmsBLmUl3uz9IuzWj/U6FtvQ=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8=

View File

@ -3,6 +3,8 @@ package sealing
import (
"context"
"io"
"sync"
"time"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
@ -26,15 +28,22 @@ const SectorStorePrefix = "/sectors"
var log = logging.Logger("sectors")
type SectorLocation struct {
Deadline uint64
Partition uint64
}
type SealingAPI interface {
StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error)
StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error)
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorOnChainInfo, error)
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error)
StateMinerSectorSize(context.Context, address.Address, TipSetToken) (abi.SectorSize, error)
StateMinerWorkerAddress(ctx context.Context, maddr address.Address, tok TipSetToken) (address.Address, error)
StateMinerDeadlines(ctx context.Context, maddr address.Address, tok TipSetToken) (*miner.Deadlines, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, abi.SectorNumber, TipSetToken) (big.Int, error)
StateMinerDeadlines(ctx context.Context, maddr address.Address, tok TipSetToken) ([]*miner.Deadline, error)
StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error)
StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error)
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, gasPrice big.Int, gasLimit int64, params []byte) (cid.Cid, error)
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
@ -53,10 +62,28 @@ type Sealing struct {
sc SectorIDCounter
verif ffiwrapper.Verifier
pcp PreCommitPolicy
pcp PreCommitPolicy
unsealedInfoMap UnsealedSectorMap
upgradeLk sync.Mutex
toUpgrade map[abi.SectorNumber]struct{}
getSealDelay GetSealingDelayFunc
}
func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy) *Sealing {
type UnsealedSectorMap struct {
infos map[abi.SectorNumber]UnsealedSectorInfo
mux sync.Mutex
}
type UnsealedSectorInfo struct {
numDeals uint64
// stored should always equal sum of pieceSizes.Padded()
stored abi.PaddedPieceSize
pieceSizes []abi.UnpaddedPieceSize
}
func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gsd GetSealingDelayFunc) *Sealing {
s := &Sealing{
api: api,
events: events,
@ -66,6 +93,13 @@ func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batc
sc: sc,
verif: verif,
pcp: pcp,
unsealedInfoMap: UnsealedSectorMap{
infos: make(map[abi.SectorNumber]UnsealedSectorInfo),
mux: sync.Mutex{},
},
toUpgrade: map[abi.SectorNumber]struct{}{},
getSealDelay: gsd,
}
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
@ -85,63 +119,182 @@ func (m *Sealing) Run(ctx context.Context) error {
func (m *Sealing) Stop(ctx context.Context) error {
return m.sectors.Stop(ctx)
}
func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.SectorNumber, offset uint64, err error) {
func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
log.Infof("Adding piece for deal %d", d.DealID)
if (padreader.PaddedSize(uint64(size))) != size {
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
}
sid, err := m.sc.Next()
if err != nil {
return 0, 0, xerrors.Errorf("getting sector number: %w", err)
if size > abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded() {
return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
}
err = m.sealer.NewSector(context.TODO(), m.minerSector(sid)) // TODO: Put more than one thing in a sector
m.unsealedInfoMap.mux.Lock()
sid, pads, err := m.getSectorAndPadding(size)
if err != nil {
return 0, 0, xerrors.Errorf("initializing sector: %w", err)
m.unsealedInfoMap.mux.Unlock()
return 0, 0, xerrors.Errorf("getting available sector: %w", err)
}
// offset hard-coded to 0 since we only put one thing in a sector for now
return sid, 0, nil
for _, p := range pads {
err = m.addPiece(ctx, sid, p.Unpadded(), m.pledgeReader(p.Unpadded()), nil)
if err != nil {
m.unsealedInfoMap.mux.Unlock()
return 0, 0, xerrors.Errorf("writing pads: %w", err)
}
}
offset := m.unsealedInfoMap.infos[sid].stored
err = m.addPiece(ctx, sid, size, r, &d)
if err != nil {
m.unsealedInfoMap.mux.Unlock()
return 0, 0, xerrors.Errorf("adding piece to sector: %w", err)
}
m.unsealedInfoMap.mux.Unlock()
if m.unsealedInfoMap.infos[sid].numDeals == getDealPerSectorLimit(m.sealer.SectorSize()) {
if err := m.StartPacking(sid); err != nil {
return 0, 0, xerrors.Errorf("start packing: %w", err)
}
}
return sid, offset, nil
}
func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, d DealInfo) error {
log.Infof("Seal piece for deal %d", d.DealID)
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), []abi.UnpaddedPieceSize{}, size, r)
// Caller should hold m.unsealedInfoMap.mux
func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error {
log.Infof("Adding piece to sector %d", sectorID)
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r)
if err != nil {
return xerrors.Errorf("adding piece to sector: %w", err)
return xerrors.Errorf("writing piece: %w", err)
}
piece := Piece{
Piece: ppi,
DealInfo: di,
}
err = m.sectors.Send(uint64(sectorID), SectorAddPiece{NewPiece: piece})
if err != nil {
return err
}
ui := m.unsealedInfoMap.infos[sectorID]
num := m.unsealedInfoMap.infos[sectorID].numDeals
if di != nil {
num = num + 1
}
m.unsealedInfoMap.infos[sectorID] = UnsealedSectorInfo{
numDeals: num,
stored: ui.stored + piece.Piece.Size,
pieceSizes: append(ui.pieceSizes, piece.Piece.Size.Unpadded()),
}
return nil
}
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
return m.sectors.Send(uint64(sid), SectorRemove{})
}
// Caller should NOT hold m.unsealedInfoMap.mux
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
log.Infof("Starting packing sector %d", sectorID)
err := m.sectors.Send(uint64(sectorID), SectorStartPacking{})
if err != nil {
return err
}
m.unsealedInfoMap.mux.Lock()
delete(m.unsealedInfoMap.infos, sectorID)
m.unsealedInfoMap.mux.Unlock()
return nil
}
// Caller should hold m.unsealedInfoMap.mux
func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
ss := abi.PaddedPieceSize(m.sealer.SectorSize())
for k, v := range m.unsealedInfoMap.infos {
pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded())
if v.stored+size.Padded()+padLength <= ss {
return k, pads, nil
}
}
ns, err := m.newSector()
if err != nil {
return 0, nil, err
}
m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{
numDeals: 0,
stored: 0,
pieceSizes: nil,
}
return ns, nil, nil
}
// newSector creates a new sector for deal storage
func (m *Sealing) newSector() (abi.SectorNumber, error) {
sid, err := m.sc.Next()
if err != nil {
return 0, xerrors.Errorf("getting sector number: %w", err)
}
err = m.sealer.NewSector(context.TODO(), m.minerSector(sid))
if err != nil {
return 0, xerrors.Errorf("initializing sector: %w", err)
}
rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize())
if err != nil {
return 0, xerrors.Errorf("bad sector size: %w", err)
}
log.Infof("Creating sector %d", sid)
err = m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: rt,
})
if err != nil {
return 0, xerrors.Errorf("starting the sector fsm: %w", err)
}
sd, err := m.getSealDelay()
if err != nil {
return 0, xerrors.Errorf("getting the sealing delay: %w", err)
}
if sd > 0 {
timer := time.NewTimer(sd)
go func() {
<-timer.C
m.StartPacking(sid)
}()
}
return sid, nil
}
// newSectorCC accepts a slice of pieces with no deal (junk data)
func (m *Sealing) newSectorCC(sid abi.SectorNumber, pieces []Piece) error {
rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize())
if err != nil {
return xerrors.Errorf("bad sector size: %w", err)
}
return m.newSector(sectorID, rt, []Piece{
{
Piece: ppi,
DealInfo: &d,
},
})
}
// newSector accepts a slice of pieces which will have a deal associated with
// them (in the event of a storage deal) or no deal (in the event of sealing
// garbage data)
func (m *Sealing) newSector(sid abi.SectorNumber, rt abi.RegisteredSealProof, pieces []Piece) error {
log.Infof("Start sealing %d", sid)
return m.sectors.Send(uint64(sid), SectorStart{
log.Infof("Creating CC sector %d", sid)
return m.sectors.Send(uint64(sid), SectorStartCC{
ID: sid,
Pieces: pieces,
SectorType: rt,
})
}
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
return m.sectors.Send(uint64(sid), SectorRemove{})
}
func (m *Sealing) minerSector(num abi.SectorNumber) abi.SectorID {
mid, err := address.IDFromAddress(m.maddr)
if err != nil {
@ -157,3 +310,11 @@ func (m *Sealing) minerSector(num abi.SectorNumber) abi.SectorID {
func (m *Sealing) Address() address.Address {
return m.maddr
}
func getDealPerSectorLimit(size abi.SectorSize) uint64 {
if size < 64<<30 {
return 256
} else {
return 512
}
}

View File

@ -7,6 +7,7 @@ const (
// happy path
Empty SectorState = "Empty"
WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector
Packing SectorState = "Packing" // sector not in sealStore, and not on chain
PreCommit1 SectorState = "PreCommit1" // do PreCommit1
PreCommit2 SectorState = "PreCommit2" // do PreCommit1

View File

@ -81,6 +81,8 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired error: %w", err)})
case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)})
case *ErrNoPrecommit:
return ctx.Send(SectorRetryPreCommit{})
case *ErrPrecommitOnChain:
// noop
default:
@ -152,10 +154,12 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired error: %w", err)})
case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)})
case nil:
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
case *ErrPrecommitOnChain:
// noop, this is expected
default:
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err)
}
}
@ -180,8 +184,10 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
case *ErrPrecommitOnChain:
log.Errorf("no precommit on chain, will retry: %+v", err)
return ctx.Send(SectorRetryPreCommitWait{})
case *ErrNoPrecommit:
return ctx.Send(SectorRetryPreCommit{})
default:
return xerrors.Errorf("checkCommit sanity check error: %w", err)
return xerrors.Errorf("checkCommit sanity check error (%T): %w", err, err)
}
}

View File

@ -40,7 +40,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorNumber)
}
fillerPieces, err := m.pledgeSector(ctx.Context(), m.minerSector(sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
fillerPieces, err := m.pledgeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
if err != nil {
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
}
@ -169,18 +169,27 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
DealIDs: sector.dealIDs(),
}
depositMinimum := m.tryUpgradeSector(ctx.Context(), params)
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("could not serialize pre-commit sector parameters: %w", err)})
}
log.Info("submitting precommit for sector: ", sector.SectorNumber)
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.PreCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
collateral, err := m.api.StateMinerPreCommitDepositForPower(ctx.Context(), m.maddr, *params, tok)
if err != nil {
return xerrors.Errorf("getting initial pledge collateral: %w", err)
}
deposit := big.Max(depositMinimum, collateral)
log.Infof("submitting precommit for sector %d (deposit: %s): ", sector.SectorNumber, deposit)
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.PreCommitSector, deposit, big.NewInt(0), 0, enc.Bytes())
if err != nil {
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
}
return ctx.Send(SectorPreCommitted{Message: mcid})
return ctx.Send(SectorPreCommitted{Message: mcid, PreCommitDeposit: deposit, PreCommitInfo: *params})
}
func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error {
@ -206,7 +215,13 @@ func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInf
}
func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error {
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, sector.PreCommitTipSet)
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
if err != nil {
return xerrors.Errorf("getting precommit info: %w", err)
}
@ -216,7 +231,15 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
randHeight := pci.PreCommitEpoch + miner.PreCommitChallengeDelay
err = m.events.ChainAt(func(ectx context.Context, tok TipSetToken, curH abi.ChainEpoch) error {
err = m.events.ChainAt(func(ectx context.Context, _ TipSetToken, curH abi.ChainEpoch) error {
// in case of null blocks the randomness can land after the tipset we
// get from the events API
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}
buf := new(bytes.Buffer)
if err := m.maddr.MarshalCBOR(buf); err != nil {
return err
@ -291,13 +314,26 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return nil
}
collateral, err := m.api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, sector.SectorNumber, tok)
collateral, err := m.api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, *sector.PreCommitInfo, tok)
if err != nil {
return xerrors.Errorf("getting initial pledge collateral: %w", err)
}
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
if err != nil {
return xerrors.Errorf("getting precommit info: %w", err)
}
if pci == nil {
return ctx.Send(SectorCommitFailed{error: xerrors.Errorf("precommit info not found on chain")})
}
collateral = big.Sub(collateral, pci.PreCommitDeposit)
if collateral.LessThan(big.Zero()) {
collateral = big.Zero()
}
// TODO: check seed / ticket are up to date
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.NewInt(1), 1000000, enc.Bytes())
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.NewInt(0), 0, enc.Bytes())
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
}
@ -334,9 +370,23 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Maybe wait for some finality
if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), nil); err != nil {
if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.keepUnsealedRanges(false)); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
}
return ctx.Send(SectorFinalized{})
}
func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: track sector health / expiration
log.Infof("Proving sector %d", sector.SectorNumber)
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorNumber), sector.keepUnsealedRanges(true)); err != nil {
log.Error(err)
}
// TODO: Watch termination
// TODO: Auto-extend if set
return nil
}

View File

@ -3,11 +3,14 @@ package sealing
import (
"bytes"
"context"
"time"
"github.com/ipfs/go-cid"
sectorstorage "github.com/filecoin-project/sector-storage"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/filecoin-project/specs-storage/storage"
)
@ -28,6 +31,7 @@ type Piece struct {
type DealInfo struct {
DealID abi.DealID
DealSchedule DealSchedule
KeepUnsealed bool
}
// DealSchedule communicates the time interval of a storage deal. The deal must
@ -67,6 +71,8 @@ type SectorInfo struct {
CommR *cid.Cid
Proof []byte
PreCommitInfo *miner.SectorPreCommitInfo
PreCommitDeposit big.Int
PreCommitMessage *cid.Cid
PreCommitTipSet TipSetToken
@ -137,6 +143,32 @@ func (t *SectorInfo) sealingCtx(ctx context.Context) context.Context {
return ctx
}
// Returns list of offset/length tuples of sector data ranges which clients
// requested to keep unsealed
func (t *SectorInfo) keepUnsealedRanges(invert bool) []storage.Range {
var out []storage.Range
var at abi.UnpaddedPieceSize
for _, piece := range t.Pieces {
psize := piece.Piece.Size.Unpadded()
at += psize
if piece.DealInfo == nil {
continue
}
if piece.DealInfo.KeepUnsealed == invert {
continue
}
out = append(out, storage.Range{
Offset: at - psize,
Size: psize,
})
}
return out
}
type SectorIDCounter interface {
Next() (abi.SectorNumber, error)
}
@ -155,6 +187,8 @@ type MessageReceipt struct {
GasUsed int64
}
type GetSealingDelayFunc func() (time.Duration, error)
func (mr *MessageReceipt) Equals(o *MessageReceipt) bool {
return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed
}

92
upgrade_queue.go Normal file
View File

@ -0,0 +1,92 @@
package sealing
import (
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
)
func (m *Sealing) MarkForUpgrade(id abi.SectorNumber) error {
m.upgradeLk.Lock()
defer m.upgradeLk.Unlock()
_, found := m.toUpgrade[id]
if found {
return xerrors.Errorf("sector %d already marked for upgrade", id)
}
si, err := m.GetSectorInfo(id)
if err != nil {
return xerrors.Errorf("getting sector info: %w", err)
}
if si.State != Proving {
return xerrors.Errorf("can't mark sectors not in the 'Proving' state for upgrade")
}
if len(si.Pieces) != 1 {
return xerrors.Errorf("not a committed-capacity sector, expected 1 piece")
}
if si.Pieces[0].DealInfo != nil {
return xerrors.Errorf("not a committed-capacity sector, has deals")
}
// TODO: more checks to match actor constraints
m.toUpgrade[id] = struct{}{}
return nil
}
func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreCommitInfo) big.Int {
replace := m.maybeUpgradableSector()
if replace != nil {
loc, err := m.api.StateSectorPartition(ctx, m.maddr, *replace, nil)
if err != nil {
log.Errorf("error calling StateSectorPartition for replaced sector: %+v", err)
return big.Zero()
}
params.ReplaceCapacity = true
params.ReplaceSectorNumber = *replace
params.ReplaceSectorDeadline = loc.Deadline
params.ReplaceSectorPartition = loc.Partition
ri, err := m.GetSectorInfo(*replace)
if err != nil {
log.Errorf("error calling GetSectorInfo for replaced sector: %+v", err)
return big.Zero()
}
if params.Expiration < ri.PreCommitInfo.Expiration {
// TODO: Some limit on this
params.Expiration = ri.PreCommitInfo.Expiration
}
return ri.PreCommitDeposit
}
return big.Zero()
}
func (m *Sealing) maybeUpgradableSector() *abi.SectorNumber {
m.upgradeLk.Lock()
defer m.upgradeLk.Unlock()
for number := range m.toUpgrade {
// TODO: checks to match actor constraints
// this one looks good
/*if checks */
{
delete(m.toUpgrade, number)
return &number
}
}
return nil
}