Port sector state maching to evtsm
This commit is contained in:
parent
da2a11ed76
commit
f6d41ee77d
@ -24,7 +24,7 @@ func (t *testHandler) Plan(events []Event, state interface{}) (interface{}, erro
|
|||||||
return t.plan(events, state.(*TestState))
|
return t.plan(events, state.(*TestState))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *testHandler) plan(events []Event, state *TestState) (interface{}, error) {
|
func (t *testHandler) plan(events []Event, state *TestState) (func(Context, TestState) error, error) {
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
e := event.User.(*TestEvent)
|
e := event.User.(*TestEvent)
|
||||||
switch e.A {
|
switch e.A {
|
||||||
|
@ -97,3 +97,11 @@ func (s *Sched) Stop(ctx context.Context) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Sched) List(out interface{}) error {
|
||||||
|
return s.sts.List(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Sched) Get(i interface{}) *statestore.StoredState {
|
||||||
|
return s.sts.Get(i)
|
||||||
|
}
|
||||||
|
@ -205,7 +205,7 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id uint64, state api.SectorState) error {
|
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id uint64, state api.SectorState) error {
|
||||||
return sm.Miner.UpdateSectorState(ctx, id, storage.NonceIncrement, state)
|
return sm.Miner.ForceSectorState(ctx, id, state)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
|
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
|
||||||
|
@ -3,11 +3,13 @@ package storage
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"github.com/filecoin-project/lotus/lib/evtsm"
|
||||||
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -20,7 +22,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||||
"github.com/filecoin-project/lotus/lib/statestore"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("storageminer")
|
var log = logging.Logger("storageminer")
|
||||||
@ -37,11 +38,10 @@ type Miner struct {
|
|||||||
|
|
||||||
// Sealing
|
// Sealing
|
||||||
sb *sectorbuilder.SectorBuilder
|
sb *sectorbuilder.SectorBuilder
|
||||||
sectors *statestore.StateStore
|
sectors *evtsm.Sched
|
||||||
tktFn TicketFn
|
tktFn TicketFn
|
||||||
|
|
||||||
sectorIncoming chan *SectorInfo
|
sectorIncoming chan *SectorInfo
|
||||||
sectorUpdated chan sectorUpdate
|
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
}
|
}
|
||||||
@ -72,7 +72,7 @@ type storageMinerApi interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder, tktFn TicketFn) (*Miner, error) {
|
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder, tktFn TicketFn) (*Miner, error) {
|
||||||
return &Miner{
|
m := &Miner{
|
||||||
api: api,
|
api: api,
|
||||||
|
|
||||||
maddr: addr,
|
maddr: addr,
|
||||||
@ -80,13 +80,15 @@ func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datasto
|
|||||||
sb: sb,
|
sb: sb,
|
||||||
tktFn: tktFn,
|
tktFn: tktFn,
|
||||||
|
|
||||||
sectors: statestore.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix))),
|
|
||||||
|
|
||||||
sectorIncoming: make(chan *SectorInfo),
|
sectorIncoming: make(chan *SectorInfo),
|
||||||
sectorUpdated: make(chan sectorUpdate),
|
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
stopped: make(chan struct{}),
|
stopped: make(chan struct{}),
|
||||||
}, nil
|
}
|
||||||
|
|
||||||
|
// TODO: separate sector stuff from miner struct
|
||||||
|
m.sectors = evtsm.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), m, reflect.TypeOf(SectorInfo{}))
|
||||||
|
|
||||||
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) Run(ctx context.Context) error {
|
func (m *Miner) Run(ctx context.Context) error {
|
||||||
@ -104,15 +106,17 @@ func (m *Miner) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
go fps.run(ctx)
|
go fps.run(ctx)
|
||||||
if err := m.sectorStateLoop(ctx); err != nil {
|
if err := m.restartSectors(ctx); err != nil {
|
||||||
log.Errorf("%+v", err)
|
log.Errorf("%+v", err)
|
||||||
return xerrors.Errorf("failed to startup sector state loop: %w", err)
|
return xerrors.Errorf("failed load sector states: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) Stop(ctx context.Context) error {
|
func (m *Miner) Stop(ctx context.Context) error {
|
||||||
|
defer m.sectors.Stop(ctx)
|
||||||
|
|
||||||
close(m.stop)
|
close(m.stop)
|
||||||
select {
|
select {
|
||||||
case <-m.stopped:
|
case <-m.stopped:
|
||||||
|
263
storage/sector_2.go
Normal file
263
storage/sector_2.go
Normal file
@ -0,0 +1,263 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/lib/evtsm"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SectorStart struct {
|
||||||
|
id uint64
|
||||||
|
pieces []Piece
|
||||||
|
}
|
||||||
|
type SectorRestart struct{}
|
||||||
|
|
||||||
|
type SectorFatalError struct{ error }
|
||||||
|
|
||||||
|
type SectorPacked struct{ pieces []Piece }
|
||||||
|
|
||||||
|
type SectorSealed struct {
|
||||||
|
commR []byte
|
||||||
|
commD []byte
|
||||||
|
ticket SealTicket
|
||||||
|
}
|
||||||
|
type SectorSealFailed struct{ error }
|
||||||
|
|
||||||
|
type SectorPreCommitFailed struct{ error }
|
||||||
|
type SectorPreCommitted struct {
|
||||||
|
message cid.Cid
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorSeedReady struct {
|
||||||
|
seed SealSeed
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorSealCommitFailed struct{ error }
|
||||||
|
type SectorCommitFailed struct{ error }
|
||||||
|
type SectorCommitted struct {
|
||||||
|
message cid.Cid
|
||||||
|
proof []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorProving struct{}
|
||||||
|
|
||||||
|
type SectorFaultReported struct{ reportMsg cid.Cid }
|
||||||
|
type SectorFaultedFinal struct{}
|
||||||
|
|
||||||
|
type SectorForceState struct {
|
||||||
|
state api.SectorState
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) Plan(events []evtsm.Event, user interface{}) (interface{}, error) {
|
||||||
|
next, err := m.plan(events, user.(*SectorInfo))
|
||||||
|
if err != nil || next == nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(ctx evtsm.Context, si SectorInfo) error {
|
||||||
|
err := next(ctx, si)
|
||||||
|
if err != nil {
|
||||||
|
if err := ctx.Send(SectorFatalError{error: err}); err != nil {
|
||||||
|
return xerrors.Errorf("error while sending error: reporting %+v: %w", err, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) plan(events []evtsm.Event, state *SectorInfo) (func(evtsm.Context, SectorInfo) error, error) {
|
||||||
|
/////
|
||||||
|
// First process all events
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
if err, ok := event.User.(error); ok {
|
||||||
|
state.LastErr = fmt.Sprintf("%+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch event := event.User.(type) {
|
||||||
|
case SectorStart:
|
||||||
|
// TODO: check if state is clean
|
||||||
|
state.SectorID = event.id
|
||||||
|
state.Pieces = event.pieces
|
||||||
|
state.State = api.Packing
|
||||||
|
|
||||||
|
case SectorRestart:
|
||||||
|
// noop
|
||||||
|
case SectorFatalError:
|
||||||
|
log.Errorf("Fatal error on sector %d: %+v", state.SectorID, event.error)
|
||||||
|
// TODO: Do we want to mark the state as unrecoverable?
|
||||||
|
// I feel like this should be a softer error, where the user would
|
||||||
|
// be able to send a retry event of some kind
|
||||||
|
return nil, nil
|
||||||
|
|
||||||
|
// // TODO: Incoming
|
||||||
|
// TODO: for those - look at dealIDs matching chain
|
||||||
|
|
||||||
|
// //
|
||||||
|
// Packing
|
||||||
|
|
||||||
|
case SectorPacked:
|
||||||
|
// TODO: assert state
|
||||||
|
state.Pieces = append(state.Pieces, event.pieces...)
|
||||||
|
state.State = api.Unsealed
|
||||||
|
|
||||||
|
// // Unsealed
|
||||||
|
|
||||||
|
case SectorSealFailed:
|
||||||
|
// TODO: try to find out the reason, maybe retry
|
||||||
|
state.State = api.SealFailed
|
||||||
|
|
||||||
|
case SectorSealed:
|
||||||
|
state.CommD = event.commD
|
||||||
|
state.CommR = event.commR
|
||||||
|
state.Ticket = event.ticket
|
||||||
|
state.State = api.PreCommitting
|
||||||
|
|
||||||
|
// // PreCommit
|
||||||
|
|
||||||
|
case SectorPreCommitFailed:
|
||||||
|
// TODO: try to find out the reason, maybe retry
|
||||||
|
state.State = api.PreCommitFailed
|
||||||
|
case SectorPreCommitted:
|
||||||
|
state.PreCommitMessage = &event.message
|
||||||
|
state.State = api.PreCommitted
|
||||||
|
|
||||||
|
case SectorSeedReady:
|
||||||
|
state.Seed = event.seed
|
||||||
|
state.State = api.Committing
|
||||||
|
|
||||||
|
// // Commit
|
||||||
|
|
||||||
|
case SectorSealCommitFailed:
|
||||||
|
// TODO: try to find out the reason, maybe retry
|
||||||
|
state.State = api.SealCommitFailed
|
||||||
|
case SectorCommitFailed:
|
||||||
|
// TODO: try to find out the reason, maybe retry
|
||||||
|
state.State = api.SealFailed
|
||||||
|
case SectorCommitted:
|
||||||
|
state.Proof = event.proof
|
||||||
|
state.State = api.CommitWait
|
||||||
|
case SectorProving:
|
||||||
|
state.State = api.Proving
|
||||||
|
|
||||||
|
case SectorFaultReported:
|
||||||
|
state.FaultReportMsg = &event.reportMsg
|
||||||
|
state.State = api.FaultReported
|
||||||
|
case SectorFaultedFinal:
|
||||||
|
state.State = api.FaultedFinal
|
||||||
|
|
||||||
|
// // Debug triggers
|
||||||
|
case SectorForceState:
|
||||||
|
state.State = event.state
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/////
|
||||||
|
// Now decide what to do next
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
* Empty
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
*<- Packing <- incoming
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
*<- Unsealed <--> SealFailed
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
* PreCommitting <--> PreCommitFailed
|
||||||
|
| | ^
|
||||||
|
| v |
|
||||||
|
*<- PreCommitted ------/
|
||||||
|
| |||
|
||||||
|
| vvv v--> SealCommitFailed
|
||||||
|
*<- Committing
|
||||||
|
| | ^--> CommitFailed
|
||||||
|
| v ^
|
||||||
|
*<- CommitWait ---/
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
*<- Proving
|
||||||
|
|
|
||||||
|
v
|
||||||
|
FailedUnrecoverable
|
||||||
|
|
||||||
|
UndefinedSectorState <- ¯\_(ツ)_/¯
|
||||||
|
| ^
|
||||||
|
*---------------------/
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
switch state.State {
|
||||||
|
// Happy path
|
||||||
|
case api.Packing:
|
||||||
|
return m.handlePacking, nil
|
||||||
|
case api.Unsealed:
|
||||||
|
return m.handleUnsealed, nil
|
||||||
|
case api.PreCommitting:
|
||||||
|
return m.handlePreCommitting, nil
|
||||||
|
case api.PreCommitted:
|
||||||
|
return m.handlePreCommitted, nil
|
||||||
|
case api.Committing:
|
||||||
|
return m.handleCommitting, nil
|
||||||
|
case api.CommitWait:
|
||||||
|
return m.handleCommitWait, nil
|
||||||
|
case api.Proving:
|
||||||
|
// TODO: track sector health / expiration
|
||||||
|
log.Infof("Proving sector %d", state.SectorID)
|
||||||
|
|
||||||
|
// Handled failure modes
|
||||||
|
case api.SealFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'SealFailed'", state.SectorID)
|
||||||
|
case api.PreCommitFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'PreCommitFailed'", state.SectorID)
|
||||||
|
case api.SealCommitFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'SealCommitFailed'", state.SectorID)
|
||||||
|
case api.CommitFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'CommitFailed'", state.SectorID)
|
||||||
|
|
||||||
|
// Faults
|
||||||
|
case api.Faulty:
|
||||||
|
return m.handleFaulty, nil
|
||||||
|
case api.FaultReported:
|
||||||
|
return m.handleFaultReported, nil
|
||||||
|
|
||||||
|
// Fatal errors
|
||||||
|
case api.UndefinedSectorState:
|
||||||
|
log.Error("sector update with undefined state!")
|
||||||
|
case api.FailedUnrecoverable:
|
||||||
|
log.Errorf("sector %d failed unrecoverably", state.SectorID)
|
||||||
|
default:
|
||||||
|
log.Errorf("unexpected sector update state: %d", state.State)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) restartSectors(ctx context.Context) error {
|
||||||
|
trackedSectors, err := m.ListSectors()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("loading sector list: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, sector := range trackedSectors {
|
||||||
|
if err := m.sectors.Send(sector.SectorID, SectorRestart{}); err != nil {
|
||||||
|
log.Errorf("restarting sector %d: %+v", sector.SectorID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Grab on-chain sector set and diff with trackedSectors
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error {
|
||||||
|
return m.sectors.Send(id, SectorForceState{state})
|
||||||
|
}
|
@ -5,31 +5,14 @@ import (
|
|||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/evtsm"
|
||||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||||
)
|
)
|
||||||
|
|
||||||
type providerHandlerFunc func(ctx context.Context, deal SectorInfo) *sectorUpdate
|
func (m *Miner) handlePacking(ctx evtsm.Context, sector SectorInfo) error {
|
||||||
|
|
||||||
func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc) {
|
|
||||||
go func() {
|
|
||||||
update := cb(ctx, sector)
|
|
||||||
|
|
||||||
if update == nil {
|
|
||||||
return // async
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case m.sectorUpdated <- *update:
|
|
||||||
case <-m.stop:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
|
||||||
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
|
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
|
||||||
|
|
||||||
var allocated uint64
|
var allocated uint64
|
||||||
@ -40,51 +23,49 @@ func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpd
|
|||||||
ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
|
ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
|
||||||
|
|
||||||
if allocated > ubytes {
|
if allocated > ubytes {
|
||||||
return sector.upd().fatal(xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes))
|
return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
fillerSizes, err := fillersFromRem(ubytes - allocated)
|
fillerSizes, err := fillersFromRem(ubytes - allocated)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sector.upd().fatal(err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(fillerSizes) > 0 {
|
if len(fillerSizes) > 0 {
|
||||||
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
|
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
|
||||||
}
|
}
|
||||||
|
|
||||||
pieces, err := m.pledgeSector(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...)
|
pieces, err := m.pledgeSector(ctx.Context(), sector.SectorID, sector.existingPieces(), fillerSizes...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sector.upd().fatal(xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err))
|
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return sector.upd().to(api.Unsealed).state(func(info *SectorInfo) {
|
return ctx.Send(SectorPacked{pieces: pieces})
|
||||||
info.Pieces = append(info.Pieces, pieces...)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
func (m *Miner) handleUnsealed(ctx evtsm.Context, sector SectorInfo) error {
|
||||||
log.Infow("performing sector replication...", "sector", sector.SectorID)
|
log.Infow("performing sector replication...", "sector", sector.SectorID)
|
||||||
ticket, err := m.tktFn(ctx)
|
ticket, err := m.tktFn(ctx.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sector.upd().fatal(err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos())
|
rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err))
|
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) {
|
return ctx.Send(SectorSealed{
|
||||||
info.CommD = rspco.CommD[:]
|
commD: rspco.CommD[:],
|
||||||
info.CommR = rspco.CommR[:]
|
commR: rspco.CommR[:],
|
||||||
info.Ticket = SealTicket{
|
ticket: SealTicket{
|
||||||
BlockHeight: ticket.BlockHeight,
|
BlockHeight: ticket.BlockHeight,
|
||||||
TicketBytes: ticket.TicketBytes[:],
|
TicketBytes: ticket.TicketBytes[:],
|
||||||
}
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
func (m *Miner) handlePreCommitting(ctx evtsm.Context, sector SectorInfo) error {
|
||||||
params := &actors.SectorPreCommitInfo{
|
params := &actors.SectorPreCommitInfo{
|
||||||
SectorNumber: sector.SectorID,
|
SectorNumber: sector.SectorID,
|
||||||
|
|
||||||
@ -94,7 +75,7 @@ func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sec
|
|||||||
}
|
}
|
||||||
enc, aerr := actors.SerializeParams(params)
|
enc, aerr := actors.SerializeParams(params)
|
||||||
if aerr != nil {
|
if aerr != nil {
|
||||||
return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
|
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)})
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := &types.Message{
|
msg := &types.Message{
|
||||||
@ -108,54 +89,45 @@ func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sec
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Info("submitting precommit for sector: ", sector.SectorID)
|
log.Info("submitting precommit for sector: ", sector.SectorID)
|
||||||
smsg, err := m.api.MpoolPushMessage(ctx, msg)
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
|
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) {
|
return ctx.Send(SectorPreCommitted{message: smsg.Cid()})
|
||||||
mcid := smsg.Cid()
|
|
||||||
info.PreCommitMessage = &mcid
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
func (m *Miner) handlePreCommitted(ctx evtsm.Context, sector SectorInfo) error {
|
||||||
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
|
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
|
||||||
log.Info("Sector precommitted: ", sector.SectorID)
|
log.Info("Sector precommitted: ", sector.SectorID)
|
||||||
mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage)
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sector.upd().to(api.PreCommitFailed).error(err)
|
return ctx.Send(SectorPreCommitFailed{err})
|
||||||
}
|
}
|
||||||
|
|
||||||
if mw.Receipt.ExitCode != 0 {
|
if mw.Receipt.ExitCode != 0 {
|
||||||
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
|
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
|
||||||
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
|
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
|
||||||
return sector.upd().to(api.PreCommitFailed).error(err)
|
return ctx.Send(SectorPreCommitFailed{err})
|
||||||
}
|
}
|
||||||
log.Info("precommit message landed on chain: ", sector.SectorID)
|
log.Info("precommit message landed on chain: ", sector.SectorID)
|
||||||
|
|
||||||
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied
|
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied
|
||||||
log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight)
|
log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight)
|
||||||
|
|
||||||
updateNonce := sector.Nonce
|
err = m.events.ChainAt(func(ectx context.Context, ts *types.TipSet, curH uint64) error {
|
||||||
|
rand, err := m.api.ChainGetRandomness(ectx, ts.Key(), int64(randHeight))
|
||||||
err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error {
|
|
||||||
rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
|
err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
|
||||||
|
|
||||||
m.sectorUpdated <- *sector.upd().fatal(err)
|
ctx.Send(SectorFatalError{error: err})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
m.sectorUpdated <- *sector.upd().to(api.Committing).setNonce(updateNonce).state(func(info *SectorInfo) {
|
ctx.Send(SectorSeedReady{seed: SealSeed{
|
||||||
info.Seed = SealSeed{
|
|
||||||
BlockHeight: randHeight,
|
BlockHeight: randHeight,
|
||||||
TicketBytes: rand,
|
TicketBytes: rand,
|
||||||
}
|
}})
|
||||||
})
|
|
||||||
|
|
||||||
updateNonce++
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}, func(ctx context.Context, ts *types.TipSet) error {
|
}, func(ctx context.Context, ts *types.TipSet) error {
|
||||||
@ -170,12 +142,12 @@ func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sect
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
func (m *Miner) handleCommitting(ctx evtsm.Context, sector SectorInfo) error {
|
||||||
log.Info("scheduling seal proof computation...")
|
log.Info("scheduling seal proof computation...")
|
||||||
|
|
||||||
proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
|
proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err))
|
return ctx.Send(SectorSealCommitFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Consider splitting states and persist proof for faster recovery
|
// TODO: Consider splitting states and persist proof for faster recovery
|
||||||
@ -188,7 +160,7 @@ func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sector
|
|||||||
|
|
||||||
enc, aerr := actors.SerializeParams(params)
|
enc, aerr := actors.SerializeParams(params)
|
||||||
if aerr != nil {
|
if aerr != nil {
|
||||||
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)})
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := &types.Message{
|
msg := &types.Message{
|
||||||
@ -201,40 +173,37 @@ func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sector
|
|||||||
GasPrice: types.NewInt(1),
|
GasPrice: types.NewInt(1),
|
||||||
}
|
}
|
||||||
|
|
||||||
smsg, err := m.api.MpoolPushMessage(ctx, msg)
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Separate state before this wait, so we persist message cid?
|
return ctx.Send(SectorCommitted{
|
||||||
return sector.upd().to(api.CommitWait).state(func(info *SectorInfo) {
|
proof: proof,
|
||||||
mcid := smsg.Cid()
|
message: smsg.Cid(),
|
||||||
info.CommitMessage = &mcid
|
|
||||||
info.Proof = proof
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handleCommitWait(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
func (m *Miner) handleCommitWait(ctx evtsm.Context, sector SectorInfo) error {
|
||||||
if sector.CommitMessage == nil {
|
if sector.CommitMessage == nil {
|
||||||
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID)
|
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID)
|
||||||
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("entered commit wait with no commit cid"))
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")})
|
||||||
}
|
}
|
||||||
|
|
||||||
mw, err := m.api.StateWaitMsg(ctx, *sector.CommitMessage)
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.CommitMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for porep inclusion: %w", err))
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
if mw.Receipt.ExitCode != 0 {
|
if mw.Receipt.ExitCode != 0 {
|
||||||
log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)
|
log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)
|
||||||
return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode))
|
return xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
return sector.upd().to(api.Proving).state(func(info *SectorInfo) {
|
return ctx.Send(SectorProving{})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handleFaulty(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
func (m *Miner) handleFaulty(ctx evtsm.Context, sector SectorInfo) error {
|
||||||
// TODO: check if the fault has already been reported, and that this sector is even valid
|
// TODO: check if the fault has already been reported, and that this sector is even valid
|
||||||
|
|
||||||
// TODO: coalesce faulty sector reporting
|
// TODO: coalesce faulty sector reporting
|
||||||
@ -245,7 +214,7 @@ func (m *Miner) handleFaulty(ctx context.Context, sector SectorInfo) *sectorUpda
|
|||||||
_ = fp
|
_ = fp
|
||||||
enc, aerr := actors.SerializeParams(nil)
|
enc, aerr := actors.SerializeParams(nil)
|
||||||
if aerr != nil {
|
if aerr != nil {
|
||||||
return sector.upd().fatal(xerrors.Errorf("failed to serialize declare fault params: %w", aerr))
|
return xerrors.Errorf("failed to serialize declare fault params: %w", aerr)
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := &types.Message{
|
msg := &types.Message{
|
||||||
@ -258,32 +227,28 @@ func (m *Miner) handleFaulty(ctx context.Context, sector SectorInfo) *sectorUpda
|
|||||||
GasPrice: types.NewInt(1),
|
GasPrice: types.NewInt(1),
|
||||||
}
|
}
|
||||||
|
|
||||||
smsg, err := m.api.MpoolPushMessage(ctx, msg)
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("failed to push declare faults message to network: %w", err))
|
return xerrors.Errorf("failed to push declare faults message to network: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return sector.upd().to(api.FaultReported).state(func(info *SectorInfo) {
|
return ctx.Send(SectorFaultReported{reportMsg: smsg.Cid()})
|
||||||
c := smsg.Cid()
|
|
||||||
info.FaultReportMsg = &c
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handleFaultReported(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
func (m *Miner) handleFaultReported(ctx evtsm.Context, sector SectorInfo) error {
|
||||||
if sector.FaultReportMsg == nil {
|
if sector.FaultReportMsg == nil {
|
||||||
return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("entered fault reported state without a FaultReportMsg cid"))
|
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
|
||||||
}
|
}
|
||||||
|
|
||||||
mw, err := m.api.StateWaitMsg(ctx, *sector.FaultReportMsg)
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for fault declaration: %w", err))
|
return xerrors.Errorf("failed to wait for fault declaration: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mw.Receipt.ExitCode != 0 {
|
if mw.Receipt.ExitCode != 0 {
|
||||||
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID)
|
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID)
|
||||||
return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode))
|
return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
return sector.upd().to(api.FaultedFinal).state(func(info *SectorInfo) {})
|
return ctx.Send(SectorFaultedFinal{})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -76,10 +76,6 @@ type SectorInfo struct {
|
|||||||
LastErr string
|
LastErr string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *SectorInfo) upd() *sectorUpdate {
|
|
||||||
return §orUpdate{id: t.SectorID, nonce: t.Nonce}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
|
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
|
||||||
out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces))
|
out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces))
|
||||||
for i, piece := range t.Pieces {
|
for i, piece := range t.Pieces {
|
||||||
|
@ -2,275 +2,13 @@ package storage
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"math"
|
|
||||||
|
|
||||||
xerrors "golang.org/x/xerrors"
|
xerrors "golang.org/x/xerrors"
|
||||||
|
"io"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/lib/padreader"
|
"github.com/filecoin-project/lotus/lib/padreader"
|
||||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||||
)
|
)
|
||||||
|
|
||||||
const NonceIncrement = math.MaxUint64
|
|
||||||
|
|
||||||
type sectorUpdate struct {
|
|
||||||
newState api.SectorState
|
|
||||||
id uint64
|
|
||||||
err error
|
|
||||||
nonce uint64
|
|
||||||
mut func(*SectorInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *sectorUpdate) fatal(err error) *sectorUpdate {
|
|
||||||
u.newState = api.FailedUnrecoverable
|
|
||||||
u.err = err
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *sectorUpdate) error(err error) *sectorUpdate {
|
|
||||||
u.err = err
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate {
|
|
||||||
u.mut = m
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *sectorUpdate) to(newState api.SectorState) *sectorUpdate {
|
|
||||||
u.newState = newState
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *sectorUpdate) setNonce(nc uint64) *sectorUpdate {
|
|
||||||
u.nonce = nc
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) UpdateSectorState(ctx context.Context, sector uint64, snonce uint64, state api.SectorState) error {
|
|
||||||
select {
|
|
||||||
case m.sectorUpdated <- sectorUpdate{
|
|
||||||
newState: state,
|
|
||||||
nonce: snonce,
|
|
||||||
id: sector,
|
|
||||||
}:
|
|
||||||
return nil
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) sectorStateLoop(ctx context.Context) error {
|
|
||||||
trackedSectors, err := m.ListSectors()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("loading sector list: %+v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for _, si := range trackedSectors {
|
|
||||||
select {
|
|
||||||
case m.sectorUpdated <- sectorUpdate{
|
|
||||||
newState: si.State,
|
|
||||||
nonce: si.Nonce,
|
|
||||||
id: si.SectorID,
|
|
||||||
err: nil,
|
|
||||||
mut: nil,
|
|
||||||
}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
log.Warn("didn't restart processing for all sectors: ", ctx.Err())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
{
|
|
||||||
// verify on-chain state
|
|
||||||
trackedByID := map[uint64]*SectorInfo{}
|
|
||||||
for _, si := range trackedSectors {
|
|
||||||
i := si
|
|
||||||
trackedByID[si.SectorID] = &i
|
|
||||||
}
|
|
||||||
|
|
||||||
curTs, err := m.api.ChainHead(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("getting chain head: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ps, err := m.api.StateMinerProvingSet(ctx, m.maddr, curTs)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("getting miner proving set: %w", err)
|
|
||||||
}
|
|
||||||
for _, ocs := range ps {
|
|
||||||
if _, ok := trackedByID[ocs.SectorID]; ok {
|
|
||||||
continue // TODO: check state
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: attempt recovery
|
|
||||||
log.Warnf("untracked sector %d found on chain", ocs.SectorID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer log.Warn("quitting deal provider loop")
|
|
||||||
defer close(m.stopped)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case sector := <-m.sectorIncoming:
|
|
||||||
m.onSectorIncoming(sector)
|
|
||||||
case update := <-m.sectorUpdated:
|
|
||||||
m.onSectorUpdated(ctx, update)
|
|
||||||
case <-m.stop:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) onSectorIncoming(sector *SectorInfo) {
|
|
||||||
has, err := m.sectors.Has(sector.SectorID)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if has {
|
|
||||||
log.Warnf("SealPiece called more than once for sector %d", sector.SectorID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := m.sectors.Begin(sector.SectorID, sector); err != nil {
|
|
||||||
log.Errorf("sector tracking failed: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case m.sectorUpdated <- sectorUpdate{
|
|
||||||
newState: api.Packing,
|
|
||||||
id: sector.SectorID,
|
|
||||||
}:
|
|
||||||
case <-m.stop:
|
|
||||||
log.Warn("failed to send incoming sector update, miner shutting down")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
|
|
||||||
log.Infof("Sector %d updated state to %s", update.id, api.SectorStates[update.newState])
|
|
||||||
var sector SectorInfo
|
|
||||||
err := m.sectors.Get(update.id).Mutate(func(s *SectorInfo) error {
|
|
||||||
if update.nonce < s.Nonce {
|
|
||||||
return xerrors.Errorf("update nonce too low, ignoring (%d < %d)", update.nonce, s.Nonce)
|
|
||||||
}
|
|
||||||
|
|
||||||
if update.nonce != NonceIncrement {
|
|
||||||
s.Nonce = update.nonce
|
|
||||||
} else {
|
|
||||||
s.Nonce++ // forced update
|
|
||||||
}
|
|
||||||
s.State = update.newState
|
|
||||||
if update.err != nil {
|
|
||||||
if s.LastErr != "" {
|
|
||||||
s.LastErr += "---------\n\n"
|
|
||||||
}
|
|
||||||
s.LastErr += fmt.Sprintf("entering state %s: %+v", api.SectorStates[update.newState], update.err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if update.mut != nil {
|
|
||||||
update.mut(s)
|
|
||||||
}
|
|
||||||
sector = *s
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if update.err != nil {
|
|
||||||
log.Errorf("sector %d failed: %+v", update.id, update.err)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("sector %d update error: %+v", update.id, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
|
|
||||||
* Empty
|
|
||||||
| |
|
|
||||||
| v
|
|
||||||
*<- Packing <- incoming
|
|
||||||
| |
|
|
||||||
| v
|
|
||||||
*<- Unsealed <--> SealFailed
|
|
||||||
| |
|
|
||||||
| v
|
|
||||||
* PreCommitting <--> PreCommitFailed
|
|
||||||
| | ^
|
|
||||||
| v |
|
|
||||||
*<- PreCommitted ------/
|
|
||||||
| |||
|
|
||||||
| vvv v--> SealCommitFailed
|
|
||||||
*<- Committing
|
|
||||||
| | ^--> CommitFailed
|
|
||||||
| v ^
|
|
||||||
*<- CommitWait ---/
|
|
||||||
| |
|
|
||||||
| v
|
|
||||||
*<- Proving
|
|
||||||
|
|
|
||||||
v
|
|
||||||
FailedUnrecoverable
|
|
||||||
|
|
||||||
UndefinedSectorState <- ¯\_(ツ)_/¯
|
|
||||||
| ^
|
|
||||||
*---------------------/
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
switch update.newState {
|
|
||||||
// Happy path
|
|
||||||
case api.Packing:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handlePacking)
|
|
||||||
case api.Unsealed:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handleUnsealed)
|
|
||||||
case api.PreCommitting:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handlePreCommitting)
|
|
||||||
case api.PreCommitted:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handlePreCommitted)
|
|
||||||
case api.Committing:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handleCommitting)
|
|
||||||
case api.CommitWait:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handleCommitWait)
|
|
||||||
case api.Proving:
|
|
||||||
// TODO: track sector health / expiration
|
|
||||||
log.Infof("Proving sector %d", update.id)
|
|
||||||
|
|
||||||
// Handled failure modes
|
|
||||||
case api.SealFailed:
|
|
||||||
log.Warnf("sector %d entered unimplemented state 'SealFailed'", update.id)
|
|
||||||
case api.PreCommitFailed:
|
|
||||||
log.Warnf("sector %d entered unimplemented state 'PreCommitFailed'", update.id)
|
|
||||||
case api.SealCommitFailed:
|
|
||||||
log.Warnf("sector %d entered unimplemented state 'SealCommitFailed'", update.id)
|
|
||||||
case api.CommitFailed:
|
|
||||||
log.Warnf("sector %d entered unimplemented state 'CommitFailed'", update.id)
|
|
||||||
|
|
||||||
// Faults
|
|
||||||
case api.Faulty:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handleFaulty)
|
|
||||||
case api.FaultReported:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handleFaultReported)
|
|
||||||
|
|
||||||
// Fatal errors
|
|
||||||
case api.UndefinedSectorState:
|
|
||||||
log.Error("sector update with undefined state!")
|
|
||||||
case api.FailedUnrecoverable:
|
|
||||||
log.Errorf("sector %d failed unrecoverably", update.id)
|
|
||||||
default:
|
|
||||||
log.Errorf("unexpected sector update state: %d", update.newState)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
|
func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
|
||||||
if padreader.PaddedSize(size) != size {
|
if padreader.PaddedSize(size) != size {
|
||||||
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
|
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
|
||||||
@ -297,10 +35,9 @@ func (m *Miner) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorI
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error {
|
func (m *Miner) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error {
|
||||||
si := &SectorInfo{
|
return m.sectors.Send(sid, SectorStart{
|
||||||
SectorID: sid,
|
id: sid,
|
||||||
|
pieces: []Piece{
|
||||||
Pieces: []Piece{
|
|
||||||
{
|
{
|
||||||
DealID: dealID,
|
DealID: dealID,
|
||||||
|
|
||||||
@ -308,11 +45,5 @@ func (m *Miner) newSector(ctx context.Context, sid uint64, dealID uint64, ppi se
|
|||||||
CommP: ppi.CommP[:],
|
CommP: ppi.CommP[:],
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
})
|
||||||
select {
|
|
||||||
case m.sectorIncoming <- si:
|
|
||||||
return nil
|
|
||||||
case <-ctx.Done():
|
|
||||||
return xerrors.Errorf("failed to submit sector for sealing, queue full: %w", ctx.Err())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user