storageminer: States for common failure modes

This commit is contained in:
Łukasz Magiera 2019-12-04 00:59:40 +01:00
parent dbcb839b6f
commit 13c39452c1
6 changed files with 62 additions and 27 deletions

View File

@ -21,6 +21,11 @@ const (
Committing Committing
Proving Proving
SealFailed
PreCommitFailed
SealCommitFailed
CommitFailed
FailedUnrecoverable FailedUnrecoverable
) )
@ -33,6 +38,12 @@ var SectorStates = []string{
PreCommitted: "PreCommitted", PreCommitted: "PreCommitted",
Committing: "Committing", Committing: "Committing",
Proving: "Proving", Proving: "Proving",
SealFailed: "SealFailed",
PreCommitFailed: "PreCommitFailed",
SealCommitFailed: "SealCommitFailed",
CommitFailed: "CommitFailed",
FailedUnrecoverable: "FailedUnrecoverable", FailedUnrecoverable: "FailedUnrecoverable",
} }

View File

@ -4,6 +4,8 @@ package build
import "os" import "os"
var SectorSizes = []uint64{1024}
// Seconds // Seconds
const BlockDelay = 6 const BlockDelay = 6

View File

@ -128,7 +128,7 @@ func sectorsInfo(ctx context.Context, napi api.StorageMiner) (map[string]int, er
return nil, err return nil, err
} }
out[api.SectorStateStr(st.State)]++ out[api.SectorStates[st.State]]++
} }
return out, nil return out, nil

View File

@ -61,7 +61,7 @@ var sectorsStatusCmd = &cli.Command{
} }
fmt.Printf("SectorID:\t%d\n", status.SectorID) fmt.Printf("SectorID:\t%d\n", status.SectorID)
fmt.Printf("Status:\t%s\n", api.SectorStateStr(status.State)) fmt.Printf("Status:\t%s\n", api.SectorStates[status.State])
fmt.Printf("CommD:\t\t%x\n", status.CommD) fmt.Printf("CommD:\t\t%x\n", status.CommD)
fmt.Printf("CommR:\t\t%x\n", status.CommR) fmt.Printf("CommR:\t\t%x\n", status.CommR)
fmt.Printf("Ticket:\t\t%x\n", status.Ticket.TicketBytes) fmt.Printf("Ticket:\t\t%x\n", status.Ticket.TicketBytes)
@ -132,7 +132,7 @@ var sectorsListCmd = &cli.Command{
fmt.Printf("%d: %s\tsSet: %s\tpSet: %s\ttktH: %d\tseedH: %d\tdeals: %v\n", fmt.Printf("%d: %s\tsSet: %s\tpSet: %s\ttktH: %d\tseedH: %d\tdeals: %v\n",
s, s,
api.SectorStateStr(st.State), api.SectorStates[st.State],
yesno(inSSet), yesno(inSSet),
yesno(inPSet), yesno(inPSet),
st.Ticket.BlockHeight, st.Ticket.BlockHeight,

View File

@ -71,7 +71,7 @@ func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUp
rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos()) rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos())
if err != nil { if err != nil {
return sector.upd().fatal(xerrors.Errorf("seal pre commit failed: %w", err)) return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err))
} }
return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) { return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) {
@ -97,7 +97,7 @@ func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sec
} }
enc, aerr := actors.SerializeParams(params) enc, aerr := actors.SerializeParams(params)
if aerr != nil { if aerr != nil {
return sector.upd().fatal(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
} }
msg := &types.Message{ msg := &types.Message{
@ -113,7 +113,7 @@ func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sec
log.Info("submitting precommit for sector: ", sector.SectorID) log.Info("submitting precommit for sector: ", sector.SectorID)
smsg, err := m.api.MpoolPushMessage(ctx, msg) smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil { if err != nil {
return sector.upd().fatal(xerrors.Errorf("pushing message to mpool: %w", err)) return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
} }
return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) { return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) {
@ -127,12 +127,12 @@ func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sect
log.Info("Sector precommitted: ", sector.SectorID) log.Info("Sector precommitted: ", sector.SectorID)
mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage) mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage)
if err != nil { if err != nil {
return sector.upd().fatal(err) return sector.upd().to(api.PreCommitFailed).error(err)
} }
if mw.Receipt.ExitCode != 0 { if mw.Receipt.ExitCode != 0 {
log.Error("sector precommit failed: ", mw.Receipt.ExitCode) log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
return sector.upd().fatal(err) return sector.upd().to(api.PreCommitFailed).error(err)
} }
log.Info("precommit message landed on chain: ", sector.SectorID) log.Info("precommit message landed on chain: ", sector.SectorID)
@ -172,7 +172,7 @@ func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sector
proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco()) proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
if err != nil { if err != nil {
return sector.upd().fatal(xerrors.Errorf("computing seal proof failed: %w", err)) return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err))
} }
// TODO: Consider splitting states and persist proof for faster recovery // TODO: Consider splitting states and persist proof for faster recovery
@ -185,7 +185,7 @@ func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sector
enc, aerr := actors.SerializeParams(params) enc, aerr := actors.SerializeParams(params)
if aerr != nil { if aerr != nil {
return sector.upd().fatal(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
} }
msg := &types.Message{ msg := &types.Message{
@ -200,14 +200,14 @@ func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sector
smsg, err := m.api.MpoolPushMessage(ctx, msg) smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil { if err != nil {
log.Error(xerrors.Errorf("pushing message to mpool: %w", err)) return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
} }
// TODO: Separate state before this wait, so we persist message cid? // TODO: Separate state before this wait, so we persist message cid?
mw, err := m.api.StateWaitMsg(ctx, smsg.Cid()) mw, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil { if err != nil {
return sector.upd().fatal(xerrors.Errorf("failed to wait for porep inclusion: %w", err)) return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for porep inclusion: %w", err))
} }
if mw.Receipt.ExitCode != 0 { if mw.Receipt.ExitCode != 0 {

View File

@ -27,6 +27,15 @@ func (u *sectorUpdate) fatal(err error) *sectorUpdate {
} }
} }
func (u *sectorUpdate) error(err error) *sectorUpdate {
return &sectorUpdate{
newState: u.newState,
id: u.id,
err: err,
mut: u.mut,
}
}
func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate { func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate {
return &sectorUpdate{ return &sectorUpdate{
newState: u.newState, newState: u.newState,
@ -161,23 +170,23 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
/* /*
*<- Empty * Empty
| | | |
| v | v
*<- Packing <- incoming *<- Packing <- incoming
| | | |
| v | v
*<- Unsealed *<- Unsealed <--> SealFailed
| | | |
| v | v
*<- PreCommitting * PreCommitting <--> PreCommitFailed
| | ^
| v |
*<- PreCommitted ------/
| | | |
| v | v v--> SealCommitFailed
*<- PreCommitted
| |
| v
*<- Committing *<- Committing
| | | | ^--> CommitFailed
| v | v
*<- Proving *<- Proving
| |
@ -191,6 +200,7 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
*/ */
switch update.newState { switch update.newState {
// Happy path
case api.Packing: case api.Packing:
m.handleSectorUpdate(ctx, sector, m.handlePacking) m.handleSectorUpdate(ctx, sector, m.handlePacking)
case api.Unsealed: case api.Unsealed:
@ -204,6 +214,18 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
case api.Proving: case api.Proving:
// TODO: track sector health / expiration // TODO: track sector health / expiration
log.Infof("Proving sector %d", update.id) log.Infof("Proving sector %d", update.id)
// Handled failure modes
case api.SealFailed:
log.Warn("sector %d entered unimplemented state 'SealFailed'", update.id)
case api.PreCommitFailed:
log.Warn("sector %d entered unimplemented state 'PreCommitFailed'", update.id)
case api.SealCommitFailed:
log.Warn("sector %d entered unimplemented state 'SealCommitFailed'", update.id)
case api.CommitFailed:
log.Warn("sector %d entered unimplemented state 'CommitFailed'", update.id)
// Fatal errors
case api.UndefinedSectorState: case api.UndefinedSectorState:
log.Error("sector update with undefined state!") log.Error("sector update with undefined state!")
case api.FailedUnrecoverable: case api.FailedUnrecoverable: