Add a timer to auto-start sector sealing
This commit is contained in:
parent
171e0d0e4b
commit
b838ed4e85
5
fsm.go
5
fsm.go
@ -370,6 +370,11 @@ func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statema
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_, ok := events[0].User.(Ignorable)
|
||||||
|
if ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
return xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
|
return xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,10 @@ type globalMutator interface {
|
|||||||
applyGlobal(state *SectorInfo) bool
|
applyGlobal(state *SectorInfo) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Ignorable interface {
|
||||||
|
Ignore()
|
||||||
|
}
|
||||||
|
|
||||||
// Global events
|
// Global events
|
||||||
|
|
||||||
type SectorRestart struct{}
|
type SectorRestart struct{}
|
||||||
@ -84,6 +88,8 @@ type SectorStartPacking struct{}
|
|||||||
|
|
||||||
func (evt SectorStartPacking) apply(*SectorInfo) {}
|
func (evt SectorStartPacking) apply(*SectorInfo) {}
|
||||||
|
|
||||||
|
func (evt SectorStartPacking) Ignore() {}
|
||||||
|
|
||||||
type SectorPacked struct{ FillerPieces []abi.PieceInfo }
|
type SectorPacked struct{ FillerPieces []abi.PieceInfo }
|
||||||
|
|
||||||
func (evt SectorPacked) apply(state *SectorInfo) {
|
func (evt SectorPacked) apply(state *SectorInfo) {
|
||||||
|
21
sealing.go
21
sealing.go
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
@ -59,6 +60,8 @@ type Sealing struct {
|
|||||||
|
|
||||||
upgradeLk sync.Mutex
|
upgradeLk sync.Mutex
|
||||||
toUpgrade map[abi.SectorNumber]struct{}
|
toUpgrade map[abi.SectorNumber]struct{}
|
||||||
|
|
||||||
|
getSealDelay GetSealingDelayFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
type UnsealedSectorInfo struct {
|
type UnsealedSectorInfo struct {
|
||||||
@ -67,7 +70,7 @@ type UnsealedSectorInfo struct {
|
|||||||
pieceSizes []abi.UnpaddedPieceSize
|
pieceSizes []abi.UnpaddedPieceSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy) *Sealing {
|
func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gsd GetSealingDelayFunc) *Sealing {
|
||||||
s := &Sealing{
|
s := &Sealing{
|
||||||
api: api,
|
api: api,
|
||||||
events: events,
|
events: events,
|
||||||
@ -79,7 +82,8 @@ func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batc
|
|||||||
unsealedInfos: make(map[abi.SectorNumber]UnsealedSectorInfo),
|
unsealedInfos: make(map[abi.SectorNumber]UnsealedSectorInfo),
|
||||||
pcp: pcp,
|
pcp: pcp,
|
||||||
|
|
||||||
toUpgrade: map[abi.SectorNumber]struct{}{},
|
toUpgrade: map[abi.SectorNumber]struct{}{},
|
||||||
|
getSealDelay: gsd,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
||||||
@ -205,6 +209,19 @@ func (m *Sealing) newSector() (abi.SectorNumber, error) {
|
|||||||
return 0, xerrors.Errorf("starting the sector fsm: %w", err)
|
return 0, xerrors.Errorf("starting the sector fsm: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sd, err := m.getSealDelay()
|
||||||
|
if err != nil {
|
||||||
|
return 0, xerrors.Errorf("getting the sealing delay: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if sd > 0 {
|
||||||
|
timer := time.NewTimer(sd)
|
||||||
|
go func() {
|
||||||
|
<-timer.C
|
||||||
|
m.StartPacking(sid)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
m.unsealedInfos[sid] = UnsealedSectorInfo{
|
m.unsealedInfos[sid] = UnsealedSectorInfo{
|
||||||
stored: 0,
|
stored: 0,
|
||||||
pieceSizes: nil,
|
pieceSizes: nil,
|
||||||
|
3
types.go
3
types.go
@ -3,6 +3,7 @@ package sealing
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
@ -159,6 +160,8 @@ type MessageReceipt struct {
|
|||||||
GasUsed int64
|
GasUsed int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type GetSealingDelayFunc func() (time.Duration, error)
|
||||||
|
|
||||||
func (mr *MessageReceipt) Equals(o *MessageReceipt) bool {
|
func (mr *MessageReceipt) Equals(o *MessageReceipt) bool {
|
||||||
return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed
|
return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user