diff --git a/fsm.go b/fsm.go index e0c53cfce..ffffffd74 100644 --- a/fsm.go +++ b/fsm.go @@ -370,6 +370,11 @@ func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statema return nil } + _, ok := events[0].User.(Ignorable) + if ok { + return nil + } + return xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0]) } } diff --git a/fsm_events.go b/fsm_events.go index 91f8336e5..c4278991e 100644 --- a/fsm_events.go +++ b/fsm_events.go @@ -21,6 +21,10 @@ type globalMutator interface { applyGlobal(state *SectorInfo) bool } +type Ignorable interface { + Ignore() +} + // Global events type SectorRestart struct{} @@ -84,6 +88,8 @@ type SectorStartPacking struct{} func (evt SectorStartPacking) apply(*SectorInfo) {} +func (evt SectorStartPacking) Ignore() {} + type SectorPacked struct{ FillerPieces []abi.PieceInfo } func (evt SectorPacked) apply(state *SectorInfo) { diff --git a/sealing.go b/sealing.go index 358bc9658..f5f3fb480 100644 --- a/sealing.go +++ b/sealing.go @@ -4,6 +4,7 @@ import ( "context" "io" "sync" + "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -59,6 +60,8 @@ type Sealing struct { upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} + + getSealDelay GetSealingDelayFunc } type UnsealedSectorInfo struct { @@ -67,7 +70,7 @@ type UnsealedSectorInfo struct { pieceSizes []abi.UnpaddedPieceSize } -func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy) *Sealing { +func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gsd GetSealingDelayFunc) *Sealing { s := &Sealing{ api: api, events: events, @@ -79,7 +82,8 @@ func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batc unsealedInfos: make(map[abi.SectorNumber]UnsealedSectorInfo), pcp: pcp, - toUpgrade: map[abi.SectorNumber]struct{}{}, + toUpgrade: map[abi.SectorNumber]struct{}{}, + getSealDelay: gsd, } s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) @@ -205,6 +209,19 @@ func (m *Sealing) newSector() (abi.SectorNumber, error) { return 0, xerrors.Errorf("starting the sector fsm: %w", err) } + sd, err := m.getSealDelay() + if err != nil { + return 0, xerrors.Errorf("getting the sealing delay: %w", err) + } + + if sd > 0 { + timer := time.NewTimer(sd) + go func() { + <-timer.C + m.StartPacking(sid) + }() + } + m.unsealedInfos[sid] = UnsealedSectorInfo{ stored: 0, pieceSizes: nil, diff --git a/types.go b/types.go index 41e1e0954..4bcb6f24a 100644 --- a/types.go +++ b/types.go @@ -3,6 +3,7 @@ package sealing import ( "bytes" "context" + "time" "github.com/ipfs/go-cid" @@ -159,6 +160,8 @@ type MessageReceipt struct { GasUsed int64 } +type GetSealingDelayFunc func() (time.Duration, error) + func (mr *MessageReceipt) Equals(o *MessageReceipt) bool { return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed }