lotus/storage/pipeline/fsm.go

885 lines
26 KiB
Go
Raw Permalink Normal View History

//go:generate go run ./gen
package sealing
import (
"bytes"
"context"
2020-03-22 20:44:27 +00:00
"encoding/json"
"errors"
2020-01-22 20:29:19 +00:00
"fmt"
"net/http"
"os"
"reflect"
2020-01-22 20:29:19 +00:00
"time"
"golang.org/x/xerrors"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
)
var errSectorRemoved = errors.New("sector removed")
2020-03-06 18:59:08 +00:00
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
next, processed, err := m.plan(events, user.(*SectorInfo))
if err != nil || next == nil {
if err == errSectorRemoved && os.Getenv("LOTUS_KEEP_REMOVED_FSM_ACTIVE") != "1" {
return nil, processed, statemachine.ErrTerminated
}
l := Log{
Timestamp: uint64(time.Now().Unix()),
Message: fmt.Sprintf("state machine error: %s", err),
Kind: fmt.Sprintf("error;%T", err),
}
user.(*SectorInfo).logAppend(l)
return nil, processed, nil
}
return func(ctx statemachine.Context, si SectorInfo) error {
err := next(ctx, si)
if err != nil {
2020-04-06 22:31:33 +00:00
log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err)
return nil
}
return nil
}, processed, nil // TODO: This processed event count is not very correct
}
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) (uint64, error){
// external import
ReceiveSector: planOne(
onReturning(SectorReceived{}),
),
2020-06-22 16:42:38 +00:00
// Sealing
2020-06-23 19:32:22 +00:00
UndefinedSectorState: planOne(
2021-01-18 13:26:03 +00:00
on(SectorStart{}, WaitDeals),
2020-06-23 19:32:22 +00:00
on(SectorStartCC{}, Packing),
on(SectorReceive{}, ReceiveSector),
2020-06-23 19:32:22 +00:00
),
2021-01-18 13:26:03 +00:00
Empty: planOne( // deprecated
on(SectorAddPiece{}, AddPiece),
on(SectorStartPacking{}, Packing),
),
WaitDeals: planOne(
2021-01-18 13:26:03 +00:00
on(SectorAddPiece{}, AddPiece),
on(SectorStartPacking{}, Packing),
),
2021-01-18 13:26:03 +00:00
AddPiece: planOne(
on(SectorPieceAdded{}, WaitDeals),
2021-01-18 20:59:34 +00:00
apply(SectorStartPacking{}),
apply(SectorAddPiece{}),
on(SectorAddPieceFailed{}, AddPieceFailed),
2021-01-18 13:26:03 +00:00
),
2020-10-13 19:35:29 +00:00
Packing: planOne(on(SectorPacked{}, GetTicket)),
GetTicket: planOne(
on(SectorTicket{}, PreCommit1),
on(SectorCommitFailed{}, CommitFailed),
),
2020-04-06 20:23:37 +00:00
PreCommit1: planOne(
on(SectorPreCommit1{}, PreCommit2),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
2020-09-29 07:57:36 +00:00
on(SectorOldTicket{}, GetTicket),
2020-04-03 16:54:01 +00:00
),
2020-04-06 20:23:37 +00:00
PreCommit2: planOne(
on(SectorPreCommit2{}, SubmitPreCommitBatch),
on(SectorSealPreCommit2Failed{}, SealPreCommit2Failed),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
),
2020-04-06 20:23:37 +00:00
PreCommitting: planOne(
2021-05-18 15:21:10 +00:00
on(SectorPreCommitBatch{}, SubmitPreCommitBatch),
2020-05-18 22:49:21 +00:00
on(SectorPreCommitted{}, PreCommitWait),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
2020-04-06 20:23:37 +00:00
on(SectorChainPreCommitFailed{}, PreCommitFailed),
2020-06-02 21:45:28 +00:00
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
2021-05-18 15:37:52 +00:00
SubmitPreCommitBatch: planOne(
on(SectorPreCommitBatchSent{}, PreCommitBatchWait),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
2021-05-18 15:37:52 +00:00
),
PreCommitBatchWait: planOne(
on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorRetryPreCommit{}, PreCommitting),
),
2020-05-18 22:49:21 +00:00
PreCommitWait: planOne(
on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorRetryPreCommit{}, PreCommitting),
2020-05-18 22:49:21 +00:00
),
2020-04-06 20:23:37 +00:00
WaitSeed: planOne(
on(SectorSeedReady{}, Committing),
on(SectorChainPreCommitFailed{}, PreCommitFailed),
),
Committing: planCommitting,
2021-06-11 09:41:28 +00:00
CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizedAvailable{}, SubmitCommit),
2021-06-11 09:41:28 +00:00
on(SectorFinalizeFailed{}, CommitFinalizeFailed),
),
SubmitCommit: planOne(
on(SectorCommitSubmitted{}, CommitWait),
2021-03-10 15:16:44 +00:00
on(SectorSubmitCommitAggregate{}, SubmitCommitAggregate),
on(SectorCommitFailed{}, CommitFailed),
2021-03-10 15:16:44 +00:00
),
SubmitCommitAggregate: planOne(
on(SectorCommitAggregateSent{}, CommitAggregateWait),
2021-03-10 15:16:44 +00:00
on(SectorCommitFailed{}, CommitFailed),
on(SectorRetrySubmitCommit{}, SubmitCommit),
),
CommitWait: planOne(
on(SectorProving{}, FinalizeSector),
on(SectorCommitFailed{}, CommitFailed),
on(SectorRetrySubmitCommit{}, SubmitCommit),
2020-01-20 08:23:56 +00:00
),
2021-03-10 15:16:44 +00:00
CommitAggregateWait: planOne(
on(SectorProving{}, FinalizeSector),
on(SectorCommitFailed{}, CommitFailed),
on(SectorRetrySubmitCommit{}, SubmitCommit),
),
FinalizeSector: planOne(
onWithCB(SectorFinalized{}, Proving, maybeNotifyRemoteDone(true, "Proving")),
onWithCB(SectorFinalizedAvailable{}, Available, maybeNotifyRemoteDone(true, "Available")),
2020-06-03 21:42:13 +00:00
on(SectorFinalizeFailed{}, FinalizeFailed),
2020-01-29 21:25:06 +00:00
),
// Snap deals
SnapDealsWaitDeals: planOne(
on(SectorAddPiece{}, SnapDealsAddPiece),
on(SectorStartPacking{}, SnapDealsPacking),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
SnapDealsAddPiece: planOne(
on(SectorPieceAdded{}, SnapDealsWaitDeals),
apply(SectorStartPacking{}),
apply(SectorAddPiece{}),
on(SectorAddPieceFailed{}, SnapDealsAddPieceFailed),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
SnapDealsPacking: planOne(
on(SectorPacked{}, UpdateReplica),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
UpdateReplica: planOne(
on(SectorReplicaUpdate{}, ProveReplicaUpdate),
on(SectorUpdateReplicaFailed{}, ReplicaUpdateFailed),
on(SectorDealsExpired{}, SnapDealsDealsExpired),
on(SectorInvalidDealIDs{}, SnapDealsRecoverDealIDs),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
ProveReplicaUpdate: planOne(
on(SectorProveReplicaUpdate{}, FinalizeReplicaUpdate),
on(SectorProveReplicaUpdateFailed{}, ReplicaUpdateFailed),
on(SectorDealsExpired{}, SnapDealsDealsExpired),
on(SectorInvalidDealIDs{}, SnapDealsRecoverDealIDs),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
FinalizeReplicaUpdate: planOne(
on(SectorFinalized{}, SubmitReplicaUpdate),
on(SectorFinalizeFailed{}, FinalizeReplicaUpdateFailed),
),
SubmitReplicaUpdate: planOne(
on(SectorReplicaUpdateSubmitted{}, ReplicaUpdateWait),
on(SectorSubmitReplicaUpdateFailed{}, ReplicaUpdateFailed),
2022-11-07 14:56:53 +00:00
on(SectorDeadlineImmutable{}, WaitMutable),
),
WaitMutable: planOne(
on(SectorDeadlineMutable{}, SubmitReplicaUpdate),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
ReplicaUpdateWait: planOne(
on(SectorReplicaUpdateLanded{}, UpdateActivating),
on(SectorSubmitReplicaUpdateFailed{}, ReplicaUpdateFailed),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
UpdateActivating: planOne(
on(SectorUpdateActive{}, ReleaseSectorKey),
),
ReleaseSectorKey: planOne(
on(SectorKeyReleased{}, Proving),
on(SectorReleaseKeyFailed{}, ReleaseSectorKeyFailed),
),
2020-06-22 16:42:38 +00:00
// Sealing errors
AddPieceFailed: planOne(
on(SectorRetryWaitDeals{}, WaitDeals),
apply(SectorStartPacking{}),
apply(SectorAddPiece{}),
),
SealPreCommit1Failed: planOne(
on(SectorRetrySealPreCommit1{}, PreCommit1),
),
SealPreCommit2Failed: planOne(
on(SectorRetrySealPreCommit1{}, PreCommit1),
on(SectorRetrySealPreCommit2{}, PreCommit2),
),
2020-04-06 20:23:37 +00:00
PreCommitFailed: planOne(
on(SectorRetryPreCommit{}, PreCommitting),
on(SectorRetryPreCommitWait{}, PreCommitWait),
2020-04-06 20:23:37 +00:00
on(SectorRetryWaitSeed{}, WaitSeed),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
2020-06-02 21:45:28 +00:00
on(SectorPreCommitLanded{}, WaitSeed),
onWithCB(SectorDealsExpired{}, DealsExpired, maybeNotifyRemoteDone(false, "DealsExpired")),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
2020-01-23 17:34:04 +00:00
),
2020-04-06 20:23:37 +00:00
ComputeProofFailed: planOne(
on(SectorRetryComputeProof{}, Committing),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
),
2022-09-09 10:54:48 +00:00
RemoteCommitFailed: planOne(
2022-09-09 08:32:27 +00:00
on(SectorRetryComputeProof{}, Committing),
),
CommitFinalizeFailed: planOne(
2021-06-18 04:09:02 +00:00
on(SectorRetryFinalize{}, CommitFinalize),
),
2020-04-06 20:23:37 +00:00
CommitFailed: planOne(
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
2020-04-06 20:23:37 +00:00
on(SectorRetryWaitSeed{}, WaitSeed),
on(SectorRetryComputeProof{}, Committing),
on(SectorRetryInvalidProof{}, Committing),
2020-06-17 15:19:36 +00:00
on(SectorRetryPreCommitWait{}, PreCommitWait),
2020-08-05 01:30:58 +00:00
on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorRetryPreCommit{}, PreCommitting),
on(SectorRetryCommitWait{}, CommitWait),
on(SectorRetrySubmitCommit{}, SubmitCommit),
onWithCB(SectorDealsExpired{}, DealsExpired, maybeNotifyRemoteDone(false, "DealsExpired")),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
onWithCB(SectorTicketExpired{}, Removing, maybeNotifyRemoteDone(false, "Removing")),
),
2020-06-03 21:42:13 +00:00
FinalizeFailed: planOne(
on(SectorRetryFinalize{}, FinalizeSector),
),
PackingFailed: planOne(), // TODO: Deprecated, remove
2020-08-27 21:14:46 +00:00
DealsExpired: planOne(
// SectorRemove (global)
),
RecoverDealIDs: planOne(
onReturning(SectorUpdateDealIDs{}),
),
// Snap Deals Errors
SnapDealsAddPieceFailed: planOne(
on(SectorRetryWaitDeals{}, SnapDealsWaitDeals),
apply(SectorStartPacking{}),
apply(SectorAddPiece{}),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
SnapDealsDealsExpired: planOne(
on(SectorAbortUpgrade{}, AbortUpgrade),
),
SnapDealsRecoverDealIDs: planOne(
on(SectorUpdateDealIDs{}, SubmitReplicaUpdate),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
AbortUpgrade: planOneOrIgnore(
on(SectorRevertUpgradeToProving{}, Proving),
),
ReplicaUpdateFailed: planOne(
on(SectorRetrySubmitReplicaUpdateWait{}, ReplicaUpdateWait),
on(SectorRetrySubmitReplicaUpdate{}, SubmitReplicaUpdate),
on(SectorRetryReplicaUpdate{}, UpdateReplica),
on(SectorRetryProveReplicaUpdate{}, ProveReplicaUpdate),
on(SectorInvalidDealIDs{}, SnapDealsRecoverDealIDs),
on(SectorDealsExpired{}, SnapDealsDealsExpired),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
ReleaseSectorKeyFailed: planOne(
on(SectorUpdateActive{}, ReleaseSectorKey),
),
2022-03-02 16:33:33 +00:00
FinalizeReplicaUpdateFailed: planOne(
on(SectorRetryFinalize{}, FinalizeReplicaUpdate),
),
2020-06-22 16:42:38 +00:00
// Post-seal
Proving: planOne(
on(SectorFaultReported{}, FaultReported),
on(SectorFaulty{}, Faulty),
2022-03-16 16:33:05 +00:00
on(SectorMarkForUpdate{}, Available),
),
Available: planOne(
on(SectorStartCCUpdate{}, SnapDealsWaitDeals),
on(SectorAbortUpgrade{}, Proving),
2020-06-22 16:42:38 +00:00
),
2021-01-12 23:42:01 +00:00
Terminating: planOne(
on(SectorTerminating{}, TerminateWait),
on(SectorTerminateFailed{}, TerminateFailed),
),
TerminateWait: planOne(
on(SectorTerminated{}, TerminateFinality),
on(SectorTerminateFailed{}, TerminateFailed),
),
TerminateFinality: planOne(
on(SectorTerminateFailed{}, TerminateFailed),
// SectorRemove (global)
),
TerminateFailed: planOne(
// SectorTerminating (global)
),
Removing: planOneOrIgnore(
2020-06-22 16:42:38 +00:00
on(SectorRemoved{}, Removed),
on(SectorRemoveFailed{}, RemoveFailed),
),
2020-08-27 21:59:01 +00:00
RemoveFailed: planOne(
// SectorRemove (global)
),
Faulty: planOne(
on(SectorFaultReported{}, FaultReported),
),
2020-06-22 16:42:38 +00:00
2020-11-26 09:57:47 +00:00
FaultReported: final, // not really supported right now
FaultedFinal: final,
2022-03-28 11:07:12 +00:00
Removed: finalRemoved,
2020-11-26 09:57:47 +00:00
FailedUnrecoverable: final,
}
func (state *SectorInfo) logAppend(l Log) {
if len(state.Log) > 8000 {
log.Warnw("truncating sector log", "sector", state.SectorNumber)
state.Log[2000] = Log{
Timestamp: uint64(time.Now().Unix()),
Message: "truncating log (above 8000 entries)",
Kind: fmt.Sprintf("truncate"),
}
state.Log = append(state.Log[:2000], state.Log[6000:]...)
}
state.Log = append(state.Log, l)
}
func (m *Sealing) logEvents(events []statemachine.Event, state *SectorInfo) {
2020-01-22 20:29:19 +00:00
for _, event := range events {
2021-05-30 13:13:38 +00:00
log.Debugw("sector event", "sector", state.SectorNumber, "type", fmt.Sprintf("%T", event.User), "event", event.User)
2020-03-22 20:44:27 +00:00
e, err := json.Marshal(event)
if err != nil {
log.Errorf("marshaling event for logging: %+v", err)
continue
}
if event.User == (SectorRestart{}) {
continue // don't log on every fsm restart
}
2021-05-30 17:24:42 +00:00
if len(e) > 8000 {
e = []byte(string(e[:8000]) + "... truncated")
}
2020-01-22 20:29:19 +00:00
l := Log{
Timestamp: uint64(time.Now().Unix()),
2020-03-22 20:44:27 +00:00
Message: string(e),
2020-01-22 20:29:19 +00:00
Kind: fmt.Sprintf("event;%T", event.User),
}
if err, iserr := event.User.(xerrors.Formatter); iserr {
l.Trace = fmt.Sprintf("%+v", err)
}
state.logAppend(l)
2020-01-22 20:29:19 +00:00
}
}
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, uint64, error) {
/////
// First process all events
m.logEvents(events, state)
2020-01-22 20:29:19 +00:00
if m.notifee != nil {
defer func(before SectorInfo) {
m.notifee(before, *state)
}(*state) // take safe-ish copy of the before state (except for nested pointers)
}
p := fsmPlanners[state.State]
if p == nil {
if len(events) == 1 {
if _, ok := events[0].User.(globalMutator); ok {
p = planOne() // in case we're in a really weird state, allow restart / update state / remove
}
}
if p == nil {
return nil, 0, xerrors.Errorf("planner for state %s not found", state.State)
}
}
processed, err := p(events, state)
if err != nil {
return nil, processed, xerrors.Errorf("running planner for state %s failed: %w", state.State, err)
}
/////
// Now decide what to do next
/*
2021-01-18 13:26:03 +00:00
UndefinedSectorState (start)
v |
*<- WaitDeals <-> AddPiece |
| | /--------------------/
| v v
*<- Packing <- incoming committed capacity
| |
| v
2020-09-29 07:57:36 +00:00
| GetTicket
| | ^
| v |
*<- PreCommit1 <--> SealPreCommit1Failed
| | ^ ^^
| | *----------++----\
| v v || |
*<- PreCommit2 --------++--> SealPreCommit2Failed
| | ||
| v /-------/|
* PreCommitting <-----+---> PreCommitFailed
| | | ^
| v | |
*<- WaitSeed -----------+-----/
| ||| ^ |
| ||| \--------*-----/
| ||| |
| vvv v----+----> ComputeProofFailed
*<- Committing |
| | ^--> CommitFailed
| v ^
2021-09-23 02:39:39 +00:00
| SubmitCommit |
| | |
| v |
*<- CommitWait ---/
| |
| v
| FinalizeSector <--> FinalizeFailed
| |
| v
*<- Proving
|
v
FailedUnrecoverable
*/
if err := m.onUpdateSector(context.TODO(), state); err != nil {
log.Errorw("update sector stats", "error", err)
}
switch state.State {
case ReceiveSector:
return m.handleReceiveSector, processed, nil
// Happy path
2020-06-26 15:29:08 +00:00
case Empty:
fallthrough
case WaitDeals:
2021-01-18 13:26:03 +00:00
return m.handleWaitDeals, processed, nil
case AddPiece:
return m.handleAddPiece, processed, nil
case Packing:
return m.handlePacking, processed, nil
2020-09-29 07:57:36 +00:00
case GetTicket:
return m.handleGetTicket, processed, nil
2020-04-06 20:23:37 +00:00
case PreCommit1:
return m.handlePreCommit1, processed, nil
2020-04-06 20:23:37 +00:00
case PreCommit2:
return m.handlePreCommit2, processed, nil
2020-04-06 20:23:37 +00:00
case PreCommitting:
return m.handlePreCommitting, processed, nil
2021-05-18 15:21:10 +00:00
case SubmitPreCommitBatch:
return m.handleSubmitPreCommitBatch, processed, nil
2021-05-18 15:37:52 +00:00
case PreCommitBatchWait:
fallthrough
2020-05-18 22:49:21 +00:00
case PreCommitWait:
return m.handlePreCommitWait, processed, nil
case WaitSeed:
return m.handleWaitSeed, processed, nil
case Committing:
return m.handleCommitting, processed, nil
case SubmitCommit:
return m.handleSubmitCommit, processed, nil
2021-05-18 15:21:10 +00:00
case SubmitCommitAggregate:
return m.handleSubmitCommitAggregate, processed, nil
2021-03-10 15:16:44 +00:00
case CommitAggregateWait:
fallthrough
case CommitWait:
return m.handleCommitWait, processed, nil
2021-06-11 09:41:28 +00:00
case CommitFinalize:
fallthrough
case FinalizeSector:
return m.handleFinalizeSector, processed, nil
// Snap deals updates
case SnapDealsWaitDeals:
return m.handleWaitDeals, processed, nil
case SnapDealsAddPiece:
return m.handleAddPiece, processed, nil
case SnapDealsPacking:
return m.handlePacking, processed, nil
case UpdateReplica:
return m.handleReplicaUpdate, processed, nil
case ProveReplicaUpdate:
return m.handleProveReplicaUpdate, processed, nil
case SubmitReplicaUpdate:
return m.handleSubmitReplicaUpdate, processed, nil
2022-11-07 14:56:53 +00:00
case WaitMutable:
return m.handleWaitMutable, processed, nil
case ReplicaUpdateWait:
return m.handleReplicaUpdateWait, processed, nil
case FinalizeReplicaUpdate:
return m.handleFinalizeReplicaUpdate, processed, nil
case UpdateActivating:
return m.handleUpdateActivating, processed, nil
case ReleaseSectorKey:
return m.handleReleaseSectorKey, processed, nil
// Handled failure modes
case AddPieceFailed:
return m.handleAddPieceFailed, processed, nil
case SealPreCommit1Failed:
return m.handleSealPrecommit1Failed, processed, nil
case SealPreCommit2Failed:
return m.handleSealPrecommit2Failed, processed, nil
case PreCommitFailed:
return m.handlePreCommitFailed, processed, nil
2020-04-06 20:23:37 +00:00
case ComputeProofFailed:
return m.handleComputeProofFailed, processed, nil
2022-09-09 10:54:48 +00:00
case RemoteCommitFailed:
return m.handleRemoteCommitFailed, processed, nil
2020-04-06 20:23:37 +00:00
case CommitFailed:
return m.handleCommitFailed, processed, nil
case CommitFinalizeFailed:
fallthrough
2020-06-03 21:42:13 +00:00
case FinalizeFailed:
return m.handleFinalizeFailed, processed, nil
case PackingFailed: // DEPRECATED: remove this for the next reset
state.State = DealsExpired
fallthrough
case DealsExpired:
return m.handleDealsExpired, processed, nil
case RecoverDealIDs:
return m.HandleRecoverDealIDs, processed, nil
// Snap Deals failure modes
case SnapDealsAddPieceFailed:
return m.handleAddPieceFailed, processed, nil
case SnapDealsDealsExpired:
return m.handleDealsExpiredSnapDeals, processed, nil
case SnapDealsRecoverDealIDs:
return m.handleSnapDealsRecoverDealIDs, processed, nil
case ReplicaUpdateFailed:
return m.handleSubmitReplicaUpdateFailed, processed, nil
case ReleaseSectorKeyFailed:
return m.handleReleaseSectorKeyFailed, 0, err
2022-03-02 16:33:33 +00:00
case FinalizeReplicaUpdateFailed:
return m.handleFinalizeFailed, processed, nil
case AbortUpgrade:
return m.handleAbortUpgrade, processed, nil
2020-06-22 16:42:38 +00:00
// Post-seal
case Proving:
return m.handleProvingSector, processed, nil
2022-03-16 16:33:05 +00:00
case Available:
return m.handleAvailableSector, processed, nil
2021-01-12 23:42:01 +00:00
case Terminating:
return m.handleTerminating, processed, nil
case TerminateWait:
return m.handleTerminateWait, processed, nil
case TerminateFinality:
return m.handleTerminateFinality, processed, nil
case TerminateFailed:
return m.handleTerminateFailed, processed, nil
2020-06-22 16:42:38 +00:00
case Removing:
return m.handleRemoving, processed, nil
2020-07-14 08:41:19 +00:00
case Removed:
return nil, processed, errSectorRemoved
2020-06-22 16:42:38 +00:00
2020-08-27 21:59:01 +00:00
case RemoveFailed:
return m.handleRemoveFailed, processed, nil
2020-06-22 16:42:38 +00:00
// Faults
case Faulty:
return m.handleFaulty, processed, nil
case FaultReported:
return m.handleFaultReported, processed, nil
// Fatal errors
case UndefinedSectorState:
log.Error("sector update with undefined state!")
return nil, processed, xerrors.Errorf("sector update with undefined state")
case FailedUnrecoverable:
2020-04-06 22:31:33 +00:00
log.Errorf("sector %d failed unrecoverably", state.SectorNumber)
return nil, processed, xerrors.Errorf("sector %d failed unrecoverably", state.SectorNumber)
default:
2020-05-18 22:49:21 +00:00
log.Errorf("unexpected sector update state: %s", state.State)
return nil, processed, xerrors.Errorf("unexpected sector update state: %s", state.State)
}
}
func (m *Sealing) onUpdateSector(ctx context.Context, state *SectorInfo) error {
2021-03-12 13:56:46 +00:00
if m.getConfig == nil {
return nil // tests
}
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
}
shouldUpdateInput := m.stats.updateSector(ctx, cfg, m.minerSectorID(state.SectorNumber), state.State)
// trigger more input processing when we've dipped below max sealing limits
if shouldUpdateInput {
2021-05-27 03:53:33 +00:00
sp, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting seal proof type: %w", err)
}
go func() {
2021-03-12 14:01:00 +00:00
m.inputLk.Lock()
defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err)
}
}()
}
return nil
}
func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) {
for i, event := range events {
switch e := event.User.(type) {
case globalMutator:
if e.applyGlobal(state) {
return uint64(i + 1), nil
}
case SectorCommitted: // the normal case
e.apply(state)
state.State = SubmitCommit
2021-06-11 09:41:28 +00:00
case SectorProofReady: // early finalize
e.apply(state)
state.State = CommitFinalize
case SectorSeedReady: // seed changed :/
if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) {
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
continue // or it didn't!
}
log.Warnf("planCommitting: commit Seed changed")
e.apply(state)
state.State = Committing
return uint64(i + 1), nil
2020-01-20 22:04:46 +00:00
case SectorComputeProofFailed:
2020-04-06 20:23:37 +00:00
state.State = ComputeProofFailed
2022-09-09 10:54:48 +00:00
case SectorRemoteCommit1Failed, SectorRemoteCommit2Failed:
state.State = RemoteCommitFailed
case SectorSealPreCommit1Failed:
2020-07-15 01:41:35 +00:00
state.State = SealPreCommit1Failed
case SectorCommitFailed:
state.State = CommitFailed
case SectorRetryCommitWait:
state.State = CommitWait
default:
return uint64(i), xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
}
}
return uint64(len(events)), nil
}
func (m *Sealing) restartSectors(ctx context.Context) error {
2021-06-15 19:04:11 +00:00
defer m.startupWait.Done()
trackedSectors, err := m.ListSectors()
if err != nil {
log.Errorf("loading sector list: %+v", err)
}
for _, sector := range trackedSectors {
2020-04-06 22:31:33 +00:00
if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil {
log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err)
}
}
// TODO: Grab on-chain sector set and diff with trackedSectors
return nil
}
func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, state SectorState) error {
2021-06-15 19:04:11 +00:00
m.startupWait.Wait()
return m.sectors.Send(id, SectorForceState{state})
}
2022-03-28 11:07:12 +00:00
// as sector has been removed, it's no needs to care about later events,
// just returns length of events as `processed` is ok.
func finalRemoved(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return uint64(len(events)), nil
}
func final(events []statemachine.Event, state *SectorInfo) (uint64, error) {
if len(events) > 0 {
if gm, ok := events[0].User.(globalMutator); ok {
gm.applyGlobal(state)
return 1, nil
}
}
return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
}
2021-01-18 20:59:34 +00:00
func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
state.State = next
2021-01-18 20:59:34 +00:00
return false, nil
}
}
}
func onWithCB(mut mutator, next SectorState, cb func(info *SectorInfo)) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
cb(state)
state.State = next
return false, nil
}
}
}
2021-01-18 20:59:34 +00:00
// like `on`, but doesn't change state
func apply(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
return true, nil
}
}
}
2021-01-18 20:59:34 +00:00
func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
if state.Return == "" {
2021-01-18 20:59:34 +00:00
return false, xerrors.Errorf("return state not set")
}
state.State = SectorState(state.Return)
state.Return = ""
2021-01-18 20:59:34 +00:00
return false, nil
}
}
}
2021-01-18 20:59:34 +00:00
func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
eloop:
2021-01-18 20:59:34 +00:00
for i, event := range events {
if gm, ok := event.User.(globalMutator); ok {
gm.applyGlobal(state)
return uint64(i + 1), nil
}
2020-01-22 18:30:56 +00:00
2021-01-18 20:59:34 +00:00
for _, t := range ts {
mut, next := t()
2021-01-18 20:59:34 +00:00
if reflect.TypeOf(event.User) != reflect.TypeOf(mut) {
continue
}
if err, iserr := event.User.(error); iserr {
log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, event.User, err)
}
2021-01-18 20:59:34 +00:00
event.User.(mutator).apply(state)
more, err := next(state)
if err != nil || !more {
return uint64(i + 1), err
}
continue eloop
2020-01-21 16:05:10 +00:00
}
2021-01-18 20:59:34 +00:00
_, ok := event.User.(Ignorable)
if ok {
continue
}
2021-01-18 20:59:34 +00:00
return uint64(i + 1), xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, event.User, event)
}
2021-01-18 20:59:34 +00:00
return uint64(len(events)), nil
}
}
// planOne but ignores unhandled states without erroring, this prevents the need to handle all possible events creating
// error during forced override
func planOneOrIgnore(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
f := planOne(ts...)
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
cnt, err := f(events, state)
if err != nil {
log.Warnf("planOneOrIgnore: ignoring error from planOne: %s", err)
}
return cnt, nil
}
}
// maybeNotifyRemoteDone will send sealing-done notification to the RemoteSealingDone
// if the RemoteSealingDoneEndpoint is set. If RemoteSealingDoneEndpoint is not set,
// this is no-op
func maybeNotifyRemoteDone(success bool, state string) func(*SectorInfo) {
return func(sector *SectorInfo) {
if sector.RemoteSealingDoneEndpoint == "" {
return
}
reqData := api.RemoteSealingDoneParams{
Successful: success,
State: state,
CommitMessage: sector.CommitMessage,
}
reqBody, err := json.Marshal(&reqData)
if err != nil {
log.Errorf("marshaling remote done notification request params: %s", err)
return
}
req, err := http.NewRequest("POST", sector.RemoteSealingDoneEndpoint, bytes.NewReader(reqBody))
if err != nil {
log.Errorf("creating new remote done notification request: %s", err)
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Errorf("sending remote done notification: %s", err)
return
}
2022-09-09 12:57:56 +00:00
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode != http.StatusOK {
log.Errorf("remote done notification received non-200 http response %s", resp.Status)
return
}
}
}