Merge pull request #1124 from filecoin-project/feat/sector-recovery

sealing: Implement some common error states
This commit is contained in:
Łukasz Magiera 2020-01-24 21:25:03 +01:00 committed by GitHub
commit 9fc5f0cd9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 658 additions and 56 deletions

View File

@ -19,7 +19,7 @@ const (
Unsealed // sealing / queued
PreCommitting // on chain pre-commit
PreCommitted // waiting for seed
WaitSeed // waiting for seed
Committing
CommitWait // waiting for message to land on chain
Proving
@ -45,7 +45,7 @@ const (
PreCommitFailed
SealCommitFailed
CommitFailed
_
PackingFailed
_
_
_
@ -61,7 +61,7 @@ var SectorStates = []string{
Packing: "Packing",
Unsealed: "Unsealed",
PreCommitting: "PreCommitting",
PreCommitted: "PreCommitted",
WaitSeed: "WaitSeed",
Committing: "Committing",
CommitWait: "CommitWait",
Proving: "Proving",
@ -70,6 +70,7 @@ var SectorStates = []string{
PreCommitFailed: "PreCommitFailed",
SealCommitFailed: "SealCommitFailed",
CommitFailed: "CommitFailed",
PackingFailed: "PackingFailed",
FailedUnrecoverable: "FailedUnrecoverable",
@ -107,6 +108,15 @@ type StorageMiner interface {
WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error
}
type SectorLog struct {
Kind string
Timestamp uint64
Trace string
Message string
}
type SectorInfo struct {
SectorID uint64
State SectorState
@ -119,6 +129,8 @@ type SectorInfo struct {
Retries uint64
LastErr string
Log []SectorLog
}
type SealedRef struct {

View File

@ -73,7 +73,7 @@ func prepSyncTest(t testing.TB, h int) *syncTestUtil {
g, err := gen.NewGenerator()
if err != nil {
t.Fatal(err)
t.Fatalf("%+v", err)
}
ctx, cancel := context.WithCancel(context.Background())

View File

@ -15,6 +15,14 @@ import (
"github.com/filecoin-project/lotus/lib/tarutil"
)
func (w *worker) sizeForType(typ string) int64 {
size := int64(w.sb.SectorSize())
if typ == "cache" {
size *= 10
}
return size
}
func (w *worker) fetch(typ string, sectorID uint64) error {
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
@ -37,7 +45,7 @@ func (w *worker) fetch(typ string, sectorID uint64) error {
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}
bar := pb.New64(resp.ContentLength)
bar := pb.New64(w.sizeForType(typ))
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
@ -88,7 +96,7 @@ func (w *worker) push(typ string, sectorID uint64) error {
return xerrors.Errorf("opening push reader: %w", err)
}
bar := pb.New64(0)
bar := pb.New64(w.sizeForType(typ))
bar.ShowPercent = true
bar.ShowSpeed = true
bar.ShowCounters = true

View File

@ -2,8 +2,11 @@ package main
import (
"fmt"
"os"
"sort"
"strconv"
"text/tabwriter"
"time"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
@ -41,6 +44,12 @@ var sectorsCmd = &cli.Command{
var sectorsStatusCmd = &cli.Command{
Name: "status",
Usage: "Get the seal status of a sector by its ID",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "log",
Usage: "display event log",
},
},
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
@ -77,6 +86,17 @@ var sectorsStatusCmd = &cli.Command{
if status.LastErr != "" {
fmt.Printf("Last Error:\t\t%s\n", status.LastErr)
}
if cctx.Bool("log") {
fmt.Printf("--------\nEvent Log:\n")
for i, l := range status.Log {
fmt.Printf("%d.\t%s:\t[%s]\t%s\n", i, time.Unix(int64(l.Timestamp), 0), l.Kind, l.Message)
if l.Trace != "" {
fmt.Printf("\t%s\n", l.Trace)
}
}
}
return nil
},
}
@ -131,17 +151,19 @@ var sectorsListCmd = &cli.Command{
return list[i] < list[j]
})
w := tabwriter.NewWriter(os.Stdout, 8, 4, 1, ' ', 0)
for _, s := range list {
st, err := nodeApi.SectorsStatus(ctx, s)
if err != nil {
fmt.Printf("%d:\tError: %s\n", s, err)
fmt.Fprintf(w, "%d:\tError: %s\n", s, err)
continue
}
_, inSSet := commitedIDs[s]
_, inPSet := provingIDs[s]
fmt.Printf("%d: %s\tsSet: %s\tpSet: %s\ttktH: %d\tseedH: %d\tdeals: %v\n",
fmt.Fprintf(w, "%d: %s\tsSet: %s\tpSet: %s\ttktH: %d\tseedH: %d\tdeals: %v\n",
s,
api.SectorStates[st.State],
yesno(inSSet),
@ -151,7 +173,8 @@ var sectorsListCmd = &cli.Command{
st.Deals,
)
}
return nil
return w.Flush()
},
}

View File

@ -127,6 +127,7 @@ func main() {
sealing.SealSeed{},
sealing.Piece{},
sealing.SectorInfo{},
sealing.Log{},
)
if err != nil {
fmt.Println(err)

2
go.mod
View File

@ -20,7 +20,7 @@ require (
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8
github.com/filecoin-project/go-paramfetch v0.0.1
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200122195713-697609991669
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55
github.com/filecoin-project/go-statestore v0.1.0
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/go-ole/go-ole v1.2.4 // indirect

4
go.sum
View File

@ -117,8 +117,8 @@ github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.
github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE=
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-sectorbuilder v0.0.1/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200122195713-697609991669 h1:SpwORqUXMVB2Ejr8c4zIGiihxGM5Tu15skOWa5pvRr8=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200122195713-697609991669/go.mod h1:ahsryULdwYoZ94K09HcfqX3QBwevWVldENSV/EdCbNg=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55 h1:XChPRKPZL+/N6a3ccLmjCJ7JrR+SFLFJDllv0BkxW4I=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55/go.mod h1:ahsryULdwYoZ94K09HcfqX3QBwevWVldENSV/EdCbNg=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=

View File

@ -227,7 +227,16 @@ eventLoop:
}
} else {
nextRound := time.Unix(int64(base.ts.MinTimestamp()+uint64(build.BlockDelay*base.nullRounds)), 0)
time.Sleep(time.Until(nextRound))
select {
case <-time.After(time.Until(nextRound)):
case <-m.stop:
stopping := m.stopping
m.stop = nil
m.stopping = nil
close(stopping)
return
}
}
}
}

View File

@ -160,6 +160,16 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid uint64) (api.S
deals[i] = piece.DealID
}
log := make([]api.SectorLog, len(info.Log))
for i, l := range info.Log {
log[i] = api.SectorLog{
Kind: l.Kind,
Timestamp: l.Timestamp,
Trace: l.Trace,
Message: l.Message,
}
}
return api.SectorInfo{
SectorID: sid,
State: info.State,
@ -172,6 +182,7 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid uint64) (api.S
Retries: info.Nonce,
LastErr: info.LastErr,
Log: log,
}, nil
}

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/store"
@ -35,9 +36,6 @@ type Miner struct {
worker address.Address
sealing *sealing.Sealing
stop chan struct{}
stopped chan struct{}
}
type storageMinerApi interface {
@ -51,6 +49,7 @@ type storageMinerApi interface {
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme eventually
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error)
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
@ -59,6 +58,7 @@ type storageMinerApi interface {
ChainGetRandomness(context.Context, types.TipSetKey, int64) ([]byte, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
WalletBalance(context.Context, address.Address) (types.BigInt, error)
@ -74,9 +74,6 @@ func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datasto
tktFn: tktFn,
maddr: addr,
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
return m, nil
@ -107,14 +104,7 @@ func (m *Miner) Run(ctx context.Context) error {
func (m *Miner) Stop(ctx context.Context) error {
defer m.sealing.Stop(ctx)
close(m.stop)
select {
case <-m.stopped:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (m *Miner) runPreflightChecks(ctx context.Context) error {

View File

@ -207,6 +207,10 @@ func (sb *SBMock) GetPath(string, string) (string, error) {
panic("nyi")
}
func (sb *SBMock) CanCommit(sectorID uint64) (bool, error) {
return true, nil
}
func (sb *SBMock) WorkerStats() sectorbuilder.WorkerStats {
panic("nyi")
}

View File

@ -395,7 +395,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{173}); err != nil {
if _, err := w.Write([]byte{174}); err != nil {
return err
}
@ -661,6 +661,31 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
if _, err := w.Write([]byte(t.LastErr)); err != nil {
return err
}
// t.Log ([]sealing.Log) (slice)
if len("Log") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Log\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Log")))); err != nil {
return err
}
if _, err := w.Write([]byte("Log")); err != nil {
return err
}
if len(t.Log) > cbg.MaxLength {
return xerrors.Errorf("Slice value in field t.Log was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Log)))); err != nil {
return err
}
for _, v := range t.Log {
if err := v.MarshalCBOR(w); err != nil {
return err
}
}
return nil
}
@ -915,6 +940,211 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
t.LastErr = string(sval)
}
// t.Log ([]sealing.Log) (slice)
case "Log":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.MaxLength {
return fmt.Errorf("t.Log: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Log = make([]Log, extra)
}
for i := 0; i < int(extra); i++ {
var v Log
if err := v.UnmarshalCBOR(br); err != nil {
return err
}
t.Log[i] = v
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
}
return nil
}
func (t *Log) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{164}); err != nil {
return err
}
// t.Timestamp (uint64) (uint64)
if len("Timestamp") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Timestamp\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Timestamp")))); err != nil {
return err
}
if _, err := w.Write([]byte("Timestamp")); err != nil {
return err
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Timestamp))); err != nil {
return err
}
// t.Trace (string) (string)
if len("Trace") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Trace\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Trace")))); err != nil {
return err
}
if _, err := w.Write([]byte("Trace")); err != nil {
return err
}
if len(t.Trace) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.Trace was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Trace)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.Trace)); err != nil {
return err
}
// t.Message (string) (string)
if len("Message") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Message\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Message")))); err != nil {
return err
}
if _, err := w.Write([]byte("Message")); err != nil {
return err
}
if len(t.Message) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.Message was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.Message)); err != nil {
return err
}
// t.Kind (string) (string)
if len("Kind") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Kind\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Kind")))); err != nil {
return err
}
if _, err := w.Write([]byte("Kind")); err != nil {
return err
}
if len(t.Kind) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.Kind was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Kind)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.Kind)); err != nil {
return err
}
return nil
}
func (t *Log) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajMap {
return fmt.Errorf("cbor input should be of type map")
}
if extra > cbg.MaxLength {
return fmt.Errorf("Log: map struct too large (%d)", extra)
}
var name string
n := extra
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
switch name {
// t.Timestamp (uint64) (uint64)
case "Timestamp":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Timestamp = uint64(extra)
// t.Trace (string) (string)
case "Trace":
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.Trace = string(sval)
}
// t.Message (string) (string)
case "Message":
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.Message = string(sval)
}
// t.Kind (string) (string)
case "Kind":
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.Kind = string(sval)
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)

105
storage/sealing/checks.go Normal file
View File

@ -0,0 +1,105 @@
package sealing
import (
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
)
// TODO: For now we handle this by halting state execution, when we get jsonrpc reconnecting
// We should implement some wait-for-api logic
type ErrApi struct{ error }
type ErrInvalidDeals struct{ error }
type ErrExpiredDeals struct{ error }
type ErrBadCommD struct{ error }
type ErrExpiredTicket struct{ error }
// checkPieces validates that:
// - Each piece han a corresponding on chain deal
// - Piece commitments match with on chain deals
// - Piece sizes match
// - Deals aren't expired
func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error {
head, err := api.ChainHead(ctx)
if err != nil {
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
}
for i, piece := range si.Pieces {
deal, err := api.StateMarketStorageDeal(ctx, piece.DealID, nil)
if err != nil {
return &ErrApi{xerrors.Errorf("getting deal %d for piece %d: %w", piece.DealID, i, err)}
}
if string(deal.PieceRef) != string(piece.CommP) {
return &ErrInvalidDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers deal %d with wrong CommP: %x != %x", i, len(si.Pieces), si.SectorID, piece.DealID, piece.CommP, deal.PieceRef)}
}
if piece.Size != deal.PieceSize {
return &ErrInvalidDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers deal %d with different size: %d != %d", i, len(si.Pieces), si.SectorID, piece.DealID, piece.Size, deal.PieceSize)}
}
if head.Height() >= deal.ProposalExpiration {
return &ErrExpiredDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers expired deal %d - expires %d, head %d", i, len(si.Pieces), si.SectorID, piece.DealID, deal.ProposalExpiration, head.Height())}
}
}
return nil
}
// checkSeal checks that data commitment generated in the sealing process
// matches pieces, and that the seal ticket isn't expired
func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api sealingApi) (err error) {
head, err := api.ChainHead(ctx)
if err != nil {
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
}
ssize, err := api.StateMinerSectorSize(ctx, maddr, head)
if err != nil {
return &ErrApi{err}
}
ccparams, err := actors.SerializeParams(&actors.ComputeDataCommitmentParams{
DealIDs: si.deals(),
SectorSize: ssize,
})
if err != nil {
return xerrors.Errorf("computing params for ComputeDataCommitment: %w", err)
}
ccmt := &types.Message{
To: actors.StorageMarketAddress,
From: maddr,
Value: types.NewInt(0),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(9999999999),
Method: actors.SMAMethods.ComputeDataCommitment,
Params: ccparams,
}
r, err := api.StateCall(ctx, ccmt, nil)
if err != nil {
return &ErrApi{xerrors.Errorf("calling ComputeDataCommitment: %w", err)}
}
if r.ExitCode != 0 {
return &ErrBadCommD{xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode)}
}
if string(r.Return) != string(si.CommD) {
return &ErrBadCommD{xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD)}
}
if int64(head.Height())-int64(si.Ticket.BlockHeight+build.SealRandomnessLookback) > build.SealRandomnessLookbackLimit {
return &ErrExpiredTicket{xerrors.Errorf("ticket expired: seal height: %d, head: %d", si.Ticket.BlockHeight+build.SealRandomnessLookback, head.Height())}
}
return nil
}

View File

@ -2,7 +2,9 @@ package sealing
import (
"context"
"fmt"
"reflect"
"time"
"golang.org/x/xerrors"
@ -19,9 +21,8 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
return func(ctx statemachine.Context, si SectorInfo) error {
err := next(ctx, si)
if err != nil {
if err := ctx.Send(SectorFatalError{error: err}); err != nil {
return xerrors.Errorf("error while sending error: reporting %+v: %w", err, err)
}
log.Errorf("unhandled sector error (%d): %+v", si.SectorID, err)
return nil
}
return nil
@ -34,12 +35,14 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{
api.Unsealed: planOne(
on(SectorSealed{}, api.PreCommitting),
on(SectorSealFailed{}, api.SealFailed),
on(SectorPackingFailed{}, api.PackingFailed),
),
api.PreCommitting: planOne(
on(SectorPreCommitted{}, api.PreCommitted),
on(SectorSealFailed{}, api.SealFailed),
on(SectorPreCommitted{}, api.WaitSeed),
on(SectorPreCommitFailed{}, api.PreCommitFailed),
),
api.PreCommitted: planOne(
api.WaitSeed: planOne(
on(SectorSeedReady{}, api.Committing),
on(SectorPreCommitFailed{}, api.PreCommitFailed),
),
@ -54,6 +57,15 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{
on(SectorFaulty{}, api.Faulty),
),
api.SealFailed: planOne(
on(SectorRetrySeal{}, api.Unsealed),
),
api.PreCommitFailed: planOne(
on(SectorRetryPreCommit{}, api.PreCommitting),
on(SectorRetryWaitSeed{}, api.WaitSeed),
on(SectorSealFailed{}, api.SealFailed),
),
api.Faulty: planOne(
on(SectorFaultReported{}, api.FaultReported),
),
@ -64,9 +76,23 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
/////
// First process all events
for _, event := range events {
l := Log{
Timestamp: uint64(time.Now().Unix()),
Message: fmt.Sprintf("%+v", event),
Kind: fmt.Sprintf("event;%T", event.User),
}
if err, iserr := event.User.(xerrors.Formatter); iserr {
l.Trace = fmt.Sprintf("%+v", err)
}
state.Log = append(state.Log, l)
}
p := fsmPlanners[state.State]
if p == nil {
return nil, xerrors.Errorf("planner for state %d not found", state.State)
return nil, xerrors.Errorf("planner for state %s not found", api.SectorStates[state.State])
}
if err := p(events, state); err != nil {
@ -90,7 +116,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
* PreCommitting <--> PreCommitFailed
| | ^
| v |
*<- PreCommitted ------/
*<- WaitSeed ----------/
| |||
| vvv v--> SealCommitFailed
*<- Committing
@ -118,8 +144,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleUnsealed, nil
case api.PreCommitting:
return m.handlePreCommitting, nil
case api.PreCommitted:
return m.handlePreCommitted, nil
case api.WaitSeed:
return m.handleWaitSeed, nil
case api.Committing:
return m.handleCommitting, nil
case api.CommitWait:
@ -130,9 +156,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// Handled failure modes
case api.SealFailed:
log.Warnf("sector %d entered unimplemented state 'SealFailed'", state.SectorID)
return m.handleSealFailed, nil
case api.PreCommitFailed:
log.Warnf("sector %d entered unimplemented state 'PreCommitFailed'", state.SectorID)
return m.handlePreCommitFailed, nil
case api.SealCommitFailed:
log.Warnf("sector %d entered unimplemented state 'SealCommitFailed'", state.SectorID)
case api.CommitFailed:
@ -175,7 +201,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
e.apply(state)
state.State = api.Committing
return nil
case SectorSealCommitFailed:
case SectorComputeProofFailed:
state.State = api.SealCommitFailed
case SectorSealFailed:
state.State = api.CommitFailed
@ -221,7 +247,7 @@ func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []sta
return func(events []statemachine.Event, state *SectorInfo) error {
if len(events) != 1 {
for _, event := range events {
if gm, ok := event.User.(globalMutator); !ok {
if gm, ok := event.User.(globalMutator); ok {
gm.applyGlobal(state)
return nil
}
@ -229,6 +255,11 @@ func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []sta
return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", api.SectorStates[state.State], events)
}
if gm, ok := events[0].User.(globalMutator); ok {
gm.applyGlobal(state)
return nil
}
for _, t := range ts {
mut, next := t()
@ -245,6 +276,6 @@ func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []sta
return nil
}
return xerrors.Errorf("planner for state %s received unexpected event %+v", api.SectorStates[state.State], events[0])
return xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", api.SectorStates[state.State], events[0].User, events[0])
}
}

View File

@ -60,6 +60,10 @@ func (evt SectorPacked) apply(state *SectorInfo) {
state.Pieces = append(state.Pieces, evt.pieces...)
}
type SectorPackingFailed struct{ error }
func (evt SectorPackingFailed) apply(*SectorInfo) {}
type SectorSealed struct {
commR []byte
commD []byte
@ -96,7 +100,7 @@ func (evt SectorSeedReady) apply(state *SectorInfo) {
state.Seed = evt.seed
}
type SectorSealCommitFailed struct{ error }
type SectorComputeProofFailed struct{ error }
type SectorCommitFailed struct{ error }
@ -116,6 +120,22 @@ type SectorProving struct{}
func (evt SectorProving) apply(*SectorInfo) {}
// Failed state recovery
type SectorRetrySeal struct{}
func (evt SectorRetrySeal) apply(state *SectorInfo) {}
type SectorRetryPreCommit struct{}
func (evt SectorRetryPreCommit) apply(state *SectorInfo) {}
type SectorRetryWaitSeed struct{}
func (evt SectorRetryWaitSeed) apply(state *SectorInfo) {}
// Faults
type SectorFaulty struct{}
func (evt SectorFaulty) apply(state *SectorInfo) {}

View File

@ -41,7 +41,7 @@ func TestHappyPath(t *testing.T) {
require.Equal(m.t, m.state.State, api.PreCommitting)
m.planSingle(SectorPreCommitted{})
require.Equal(m.t, m.state.State, api.PreCommitted)
require.Equal(m.t, m.state.State, api.WaitSeed)
m.planSingle(SectorSeedReady{})
require.Equal(m.t, m.state.State, api.Committing)
@ -67,7 +67,7 @@ func TestSeedRevert(t *testing.T) {
require.Equal(m.t, m.state.State, api.PreCommitting)
m.planSingle(SectorPreCommitted{})
require.Equal(m.t, m.state.State, api.PreCommitted)
require.Equal(m.t, m.state.State, api.WaitSeed)
m.planSingle(SectorSeedReady{})
require.Equal(m.t, m.state.State, api.Committing)

View File

@ -6,7 +6,6 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/lib/padreader"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
@ -14,9 +13,11 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/padreader"
"github.com/filecoin-project/lotus/lib/statemachine"
)
@ -37,6 +38,7 @@ type sealingApi interface { // TODO: trim down
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme eventually
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error)
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
@ -45,6 +47,7 @@ type sealingApi interface { // TODO: trim down
ChainGetRandomness(context.Context, types.TipSetKey, int64) ([]byte, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
WalletBalance(context.Context, address.Address) (types.BigInt, error)

View File

@ -44,10 +44,24 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
}
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state
switch err.(type) {
case *ErrApi:
log.Errorf("handleUnsealed: api error, not proceeding: %+v", err)
return nil
case *ErrInvalidDeals:
return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)})
case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector?
return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired deals in sector: %w", err)})
default:
return xerrors.Errorf("checkPieces sanity check error: %w", err)
}
}
log.Infow("performing sector replication...", "sector", sector.SectorID)
ticket, err := m.tktFn(ctx.Context())
if err != nil {
return err
return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)})
}
rspco, err := m.sb.SealPreCommit(ctx.Context(), sector.SectorID, *ticket, sector.pieceInfos())
@ -66,6 +80,20 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
}
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too)
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrExpiredTicket:
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)})
default:
return xerrors.Errorf("checkSeal sanity check error: %w", err)
}
}
params := &actors.SectorPreCommitInfo{
SectorNumber: sector.SectorID,
@ -97,7 +125,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
return ctx.Send(SectorPreCommitted{message: smsg.Cid()})
}
func (m *Sealing) handlePreCommitted(ctx statemachine.Context, sector SectorInfo) error {
func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error {
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
log.Info("Sector precommitted: ", sector.SectorID)
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage)
@ -147,7 +175,7 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
if err != nil {
return ctx.Send(SectorSealCommitFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
// TODO: Consider splitting states and persist proof for faster recovery
@ -198,8 +226,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
}
if mw.Receipt.ExitCode != 0 {
log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)
return xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode)
return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)})
}
return ctx.Send(SectorProving{})

View File

@ -0,0 +1,119 @@
package sealing
import (
"bytes"
"fmt"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/lib/statemachine"
)
const minRetryTime = 1 * time.Minute
func failedCooldown(ctx statemachine.Context, sector SectorInfo) error {
retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(minRetryTime)
if len(sector.Log) > 0 && !time.Now().After(retryStart) {
log.Infof("%s(%d), waiting %s before retrying", api.SectorStates[sector.State], sector.SectorID, time.Until(retryStart))
select {
case <-time.After(time.Until(retryStart)):
case <-ctx.Context().Done():
return ctx.Context().Err()
}
}
return nil
}
func (m *Sealing) checkPreCommitted(ctx statemachine.Context, sector SectorInfo) (*actors.PreCommittedSector, bool) {
act, err := m.api.StateGetActor(ctx.Context(), m.maddr, nil)
if err != nil {
log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err)
return nil, true
}
st, err := m.api.ChainReadObj(ctx.Context(), act.Head)
if err != nil {
log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err)
return nil, true
}
var state actors.StorageMinerActorState
if err := state.UnmarshalCBOR(bytes.NewReader(st)); err != nil {
log.Errorf("handleSealFailed(%d): temp error: unmarshaling miner state: %+v", sector.SectorID, err)
return nil, true
}
pci, found := state.PreCommittedSectors[fmt.Sprint(sector.SectorID)]
if found {
// TODO: If not expired yet, we can just try reusing sealticket
log.Warnf("sector %d found in miner preseal array", sector.SectorID)
return pci, true
}
return nil, false
}
func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo) error {
if _, is := m.checkPreCommitted(ctx, sector); is {
// TODO: Remove this after we can re-precommit
return nil // noop, for now
}
if err := failedCooldown(ctx, sector); err != nil {
return err
}
return ctx.Send(SectorRetrySeal{})
}
func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
return nil
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too)
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrExpiredTicket:
return ctx.Send(SectorSealFailed{xerrors.Errorf("ticket expired error: %w", err)})
default:
return xerrors.Errorf("checkSeal sanity check error: %w", err)
}
}
if pci, is := m.checkPreCommitted(ctx, sector); is && pci != nil {
if sector.PreCommitMessage != nil {
log.Warn("sector %d is precommitted on chain, but we don't have precommit message", sector.SectorID)
return nil // TODO: SeedWait needs this currently
}
if string(pci.Info.CommR) != string(sector.CommR) {
log.Warn("sector %d is precommitted on chain, with different CommR: %x != %x", sector.SectorID, pci.Info.CommR, sector.CommR)
return nil // TODO: remove when the actor allows re-precommit
}
// TODO: we could compare more things, but I don't think we really need to
// CommR tells us that CommD (and CommPs), and the ticket are all matching
if err := failedCooldown(ctx, sector); err != nil {
return err
}
return ctx.Send(SectorRetryWaitSeed{})
}
if sector.PreCommitMessage != nil {
log.Warn("retrying precommit even though the message failed to apply")
}
if err := failedCooldown(ctx, sector); err != nil {
return err
}
return ctx.Send(SectorRetryPreCommit{})
}

View File

@ -2,9 +2,8 @@ package sealing
import (
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api"
"github.com/ipfs/go-cid"
)
type SealTicket struct {
@ -46,10 +45,20 @@ func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) {
return out
}
type Log struct {
Timestamp uint64
Trace string // for errors
Message string
// additional data (Event info)
Kind string
}
type SectorInfo struct {
State api.SectorState
SectorID uint64
Nonce uint64
Nonce uint64 // TODO: remove
// Packing
@ -63,7 +72,7 @@ type SectorInfo struct {
PreCommitMessage *cid.Cid
// PreCommitted
// WaitSeed
Seed SealSeed
// Committing
@ -75,7 +84,7 @@ type SectorInfo struct {
// Debug
LastErr string
// TODO: Log []struct{ts, msg, trace string}
Log []Log
}
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {