Merge pull request #716 from filecoin-project/fix/sectorstore-fails

Miner: improve sector state handling
This commit is contained in:
Łukasz Magiera 2019-12-04 02:10:30 +01:00 committed by GitHub
commit 2619567dfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 301 additions and 185 deletions

View File

@ -2,8 +2,6 @@ package api
import ( import (
"context" "context"
"fmt"
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
) )
@ -23,29 +21,30 @@ const (
Committing Committing
Proving Proving
SectorNoUpdate = UndefinedSectorState SealFailed
PreCommitFailed
SealCommitFailed
CommitFailed
FailedUnrecoverable
) )
func SectorStateStr(s SectorState) string { var SectorStates = []string{
switch s { UndefinedSectorState: "UndefinedSectorState",
case UndefinedSectorState: Empty: "Empty",
return "UndefinedSectorState" Packing: "Packing",
case Empty: Unsealed: "Unsealed",
return "Empty" PreCommitting: "PreCommitting",
case Packing: PreCommitted: "PreCommitted",
return "Packing" Committing: "Committing",
case Unsealed: Proving: "Proving",
return "Unsealed"
case PreCommitting: SealFailed: "SealFailed",
return "PreCommitting" PreCommitFailed: "PreCommitFailed",
case PreCommitted: SealCommitFailed: "SealCommitFailed",
return "PreCommitted" CommitFailed: "CommitFailed",
case Committing:
return "Committing" FailedUnrecoverable: "FailedUnrecoverable",
case Proving:
return "Proving"
}
return fmt.Sprintf("<Unknown %d>", s)
} }
// StorageMiner is a low-level interface to the Filecoin network storage miner node // StorageMiner is a low-level interface to the Filecoin network storage miner node
@ -83,6 +82,7 @@ type SectorInfo struct {
Deals []uint64 Deals []uint64
Ticket sectorbuilder.SealTicket Ticket sectorbuilder.SealTicket
Seed sectorbuilder.SealSeed Seed sectorbuilder.SealSeed
LastErr string
} }
type SealedRef struct { type SealedRef struct {

View File

@ -4,6 +4,8 @@ package build
import "os" import "os"
var SectorSizes = []uint64{1024}
// Seconds // Seconds
const BlockDelay = 6 const BlockDelay = 6

View File

@ -5,7 +5,7 @@ import (
"io" "io"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors" xerrors "golang.org/x/xerrors"
) )

View File

@ -5,7 +5,7 @@ import (
"io" "io"
"math" "math"
"github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors" xerrors "golang.org/x/xerrors"
) )

View File

@ -128,7 +128,7 @@ func sectorsInfo(ctx context.Context, napi api.StorageMiner) (map[string]int, er
return nil, err return nil, err
} }
out[api.SectorStateStr(st.State)]++ out[api.SectorStates[st.State]]++
} }
return out, nil return out, nil

View File

@ -61,7 +61,7 @@ var sectorsStatusCmd = &cli.Command{
} }
fmt.Printf("SectorID:\t%d\n", status.SectorID) fmt.Printf("SectorID:\t%d\n", status.SectorID)
fmt.Printf("Status:\t%s\n", api.SectorStateStr(status.State)) fmt.Printf("Status:\t%s\n", api.SectorStates[status.State])
fmt.Printf("CommD:\t\t%x\n", status.CommD) fmt.Printf("CommD:\t\t%x\n", status.CommD)
fmt.Printf("CommR:\t\t%x\n", status.CommR) fmt.Printf("CommR:\t\t%x\n", status.CommR)
fmt.Printf("Ticket:\t\t%x\n", status.Ticket.TicketBytes) fmt.Printf("Ticket:\t\t%x\n", status.Ticket.TicketBytes)
@ -70,6 +70,9 @@ var sectorsStatusCmd = &cli.Command{
fmt.Printf("SeedH:\t\t%d\n", status.Seed.BlockHeight) fmt.Printf("SeedH:\t\t%d\n", status.Seed.BlockHeight)
fmt.Printf("Proof:\t\t%x\n", status.Proof) fmt.Printf("Proof:\t\t%x\n", status.Proof)
fmt.Printf("Deals:\t\t%v\n", status.Deals) fmt.Printf("Deals:\t\t%v\n", status.Deals)
if status.LastErr != "" {
fmt.Printf("Last Error:\t\t%s\n", status.LastErr)
}
return nil return nil
}, },
} }
@ -132,7 +135,7 @@ var sectorsListCmd = &cli.Command{
fmt.Printf("%d: %s\tsSet: %s\tpSet: %s\ttktH: %d\tseedH: %d\tdeals: %v\n", fmt.Printf("%d: %s\tsSet: %s\tpSet: %s\ttktH: %d\tseedH: %d\tdeals: %v\n",
s, s,
api.SectorStateStr(st.State), api.SectorStates[st.State],
yesno(inSSet), yesno(inSSet),
yesno(inPSet), yesno(inPSet),
st.Ticket.BlockHeight, st.Ticket.BlockHeight,

View File

@ -60,6 +60,8 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid uint64) (api.S
Deals: deals, Deals: deals,
Ticket: info.Ticket.SB(), Ticket: info.Ticket.SB(),
Seed: info.Seed.SB(), Seed: info.Seed.SB(),
LastErr: info.LastErr,
}, nil }, nil
} }

View File

@ -239,7 +239,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull) _, err := w.Write(cbg.CborNull)
return err return err
} }
if _, err := w.Write([]byte{140}); err != nil { if _, err := w.Write([]byte{141}); err != nil {
return err return err
} }
@ -337,6 +337,13 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
} }
} }
// t.t.LastErr (string) (string)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.LastErr)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.LastErr)); err != nil {
return err
}
return nil return nil
} }
@ -351,7 +358,7 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array") return fmt.Errorf("cbor input should be of type array")
} }
if extra != 12 { if extra != 13 {
return fmt.Errorf("cbor input had wrong number of fields") return fmt.Errorf("cbor input had wrong number of fields")
} }
@ -552,5 +559,15 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
} }
} }
// t.t.LastErr (string) (string)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.LastErr = string(sval)
}
return nil return nil
} }

View File

@ -12,29 +12,24 @@ import (
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
) )
type providerHandlerFunc func(ctx context.Context, deal SectorInfo) (func(*SectorInfo), error) type providerHandlerFunc func(ctx context.Context, deal SectorInfo) *sectorUpdate
func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc, next api.SectorState) { func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc) {
go func() { go func() {
mut, err := cb(ctx, sector) update := cb(ctx, sector)
if err == nil && next == api.SectorNoUpdate { if update == nil {
return return // async
} }
select { select {
case m.sectorUpdated <- sectorUpdate{ case m.sectorUpdated <- *update:
newState: next,
id: sector.SectorID,
err: err,
mut: mut,
}:
case <-m.stop: case <-m.stop:
} }
}() }()
} }
func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpdate {
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID) log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
var allocated uint64 var allocated uint64
@ -45,12 +40,12 @@ func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*Sec
ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize()) ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
if allocated > ubytes { if allocated > ubytes {
return nil, xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes) return sector.upd().fatal(xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes))
} }
fillerSizes, err := fillersFromRem(ubytes - allocated) fillerSizes, err := fillersFromRem(ubytes - allocated)
if err != nil { if err != nil {
return nil, err return sector.upd().fatal(err)
} }
if len(fillerSizes) > 0 { if len(fillerSizes) > 0 {
@ -59,27 +54,27 @@ func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*Sec
pieces, err := m.storeGarbage(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...) pieces, err := m.storeGarbage(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...)
if err != nil { if err != nil {
return nil, xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) return sector.upd().fatal(xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err))
} }
return func(info *SectorInfo) { return sector.upd().to(api.Unsealed).state(func(info *SectorInfo) {
info.Pieces = append(info.Pieces, pieces...) info.Pieces = append(info.Pieces, pieces...)
}, nil })
} }
func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUpdate {
log.Infow("performing sector replication...", "sector", sector.SectorID) log.Infow("performing sector replication...", "sector", sector.SectorID)
ticket, err := m.tktFn(ctx) ticket, err := m.tktFn(ctx)
if err != nil { if err != nil {
return nil, err return sector.upd().fatal(err)
} }
rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos()) rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos())
if err != nil { if err != nil {
return nil, xerrors.Errorf("seal pre commit failed: %w", err) return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err))
} }
return func(info *SectorInfo) { return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) {
info.CommC = rspco.CommC[:] info.CommC = rspco.CommC[:]
info.CommD = rspco.CommD[:] info.CommD = rspco.CommD[:]
info.CommR = rspco.CommR[:] info.CommR = rspco.CommR[:]
@ -88,10 +83,11 @@ func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*Sec
BlockHeight: ticket.BlockHeight, BlockHeight: ticket.BlockHeight,
TicketBytes: ticket.TicketBytes[:], TicketBytes: ticket.TicketBytes[:],
} }
}, nil })
} }
func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
params := &actors.SectorPreCommitInfo{ params := &actors.SectorPreCommitInfo{
SectorNumber: sector.SectorID, SectorNumber: sector.SectorID,
@ -101,7 +97,7 @@ func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorI
} }
enc, aerr := actors.SerializeParams(params) enc, aerr := actors.SerializeParams(params)
if aerr != nil { if aerr != nil {
return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr) return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
} }
msg := &types.Message{ msg := &types.Message{
@ -117,26 +113,26 @@ func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorI
log.Info("submitting precommit for sector: ", sector.SectorID) log.Info("submitting precommit for sector: ", sector.SectorID)
smsg, err := m.api.MpoolPushMessage(ctx, msg) smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil { if err != nil {
return nil, xerrors.Errorf("pushing message to mpool: %w", err) return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
} }
return func(info *SectorInfo) { return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) {
mcid := smsg.Cid() mcid := smsg.Cid()
info.PreCommitMessage = &mcid info.PreCommitMessage = &mcid
}, nil })
} }
func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sectorUpdate {
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts // would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
log.Info("Sector precommitted: ", sector.SectorID) log.Info("Sector precommitted: ", sector.SectorID)
mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage) mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage)
if err != nil { if err != nil {
return nil, err return sector.upd().to(api.PreCommitFailed).error(err)
} }
if mw.Receipt.ExitCode != 0 { if mw.Receipt.ExitCode != 0 {
log.Error("sector precommit failed: ", mw.Receipt.ExitCode) log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
return nil, err return sector.upd().to(api.PreCommitFailed).error(err)
} }
log.Info("precommit message landed on chain: ", sector.SectorID) log.Info("precommit message landed on chain: ", sector.SectorID)
@ -146,19 +142,18 @@ func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*Sect
err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error { err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error {
rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight)) rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight))
if err != nil { if err != nil {
return xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
m.sectorUpdated <- *sector.upd().fatal(err)
return err
} }
m.sectorUpdated <- sectorUpdate{ m.sectorUpdated <- *sector.upd().to(api.Committing).state(func(info *SectorInfo) {
newState: api.Committing, info.Seed = SealSeed{
id: sector.SectorID, BlockHeight: randHeight,
mut: func(info *SectorInfo) { TicketBytes: rand,
info.Seed = SealSeed{ }
BlockHeight: randHeight, })
TicketBytes: rand,
}
},
}
return nil return nil
}, func(ctx context.Context, ts *types.TipSet) error { }, func(ctx context.Context, ts *types.TipSet) error {
@ -169,15 +164,15 @@ func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*Sect
log.Warn("waitForPreCommitMessage ChainAt errored: ", err) log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
} }
return nil, nil return nil
} }
func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
log.Info("scheduling seal proof computation...") log.Info("scheduling seal proof computation...")
proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco()) proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
if err != nil { if err != nil {
return nil, xerrors.Errorf("computing seal proof failed: %w", err) return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err))
} }
// TODO: Consider splitting states and persist proof for faster recovery // TODO: Consider splitting states and persist proof for faster recovery
@ -190,7 +185,7 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector
enc, aerr := actors.SerializeParams(params) enc, aerr := actors.SerializeParams(params)
if aerr != nil { if aerr != nil {
return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr) return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
} }
msg := &types.Message{ msg := &types.Message{
@ -205,24 +200,24 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector
smsg, err := m.api.MpoolPushMessage(ctx, msg) smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil { if err != nil {
log.Error(xerrors.Errorf("pushing message to mpool: %w", err)) return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
} }
// TODO: Separate state before this wait, so we persist message cid? // TODO: Separate state before this wait, so we persist message cid?
mw, err := m.api.StateWaitMsg(ctx, smsg.Cid()) mw, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to wait for porep inclusion: %w", err) return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for porep inclusion: %w", err))
} }
if mw.Receipt.ExitCode != 0 { if mw.Receipt.ExitCode != 0 {
log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, smsg.Cid(), sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, params.Proof) log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, smsg.Cid(), sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, params.Proof)
return nil, xerrors.New("UNHANDLED: submitting sector proof failed") return sector.upd().fatal(xerrors.New("UNHANDLED: submitting sector proof failed"))
} }
return func(info *SectorInfo) { return sector.upd().to(api.Proving).state(func(info *SectorInfo) {
mcid := smsg.Cid() mcid := smsg.Cid()
info.CommitMessage = &mcid info.CommitMessage = &mcid
info.Proof = proof info.Proof = proof
}, nil })
} }

114
storage/sector_types.go Normal file
View File

@ -0,0 +1,114 @@
package storage
import (
"context"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
type SealTicket struct {
BlockHeight uint64
TicketBytes []byte
}
func (t *SealTicket) SB() sectorbuilder.SealTicket {
out := sectorbuilder.SealTicket{BlockHeight: t.BlockHeight}
copy(out.TicketBytes[:], t.TicketBytes)
return out
}
type SealSeed struct {
BlockHeight uint64
TicketBytes []byte
}
func (t *SealSeed) SB() sectorbuilder.SealSeed {
out := sectorbuilder.SealSeed{BlockHeight: t.BlockHeight}
copy(out.TicketBytes[:], t.TicketBytes)
return out
}
type Piece struct {
DealID uint64
Size uint64
CommP []byte
}
func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) {
out.Size = p.Size
copy(out.CommP[:], p.CommP)
return out
}
type SectorInfo struct {
State api.SectorState
SectorID uint64
// Packing
Pieces []Piece
// PreCommit
CommC []byte
CommD []byte
CommR []byte
CommRLast []byte
Proof []byte
Ticket SealTicket
PreCommitMessage *cid.Cid
// PreCommitted
Seed SealSeed
// Committing
CommitMessage *cid.Cid
// Debug
LastErr string
}
func (t *SectorInfo) upd() *sectorUpdate {
return &sectorUpdate{id: t.SectorID}
}
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.ppi()
}
return out
}
func (t *SectorInfo) deals() []uint64 {
out := make([]uint64, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.DealID
}
return out
}
func (t *SectorInfo) existingPieces() []uint64 {
out := make([]uint64, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.Size
}
return out
}
func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput {
var out sectorbuilder.RawSealPreCommitOutput
copy(out.CommC[:], t.CommC)
copy(out.CommD[:], t.CommD)
copy(out.CommR[:], t.CommR)
copy(out.CommRLast[:], t.CommRLast)
return out
}

View File

@ -2,9 +2,9 @@ package storage
import ( import (
"context" "context"
"fmt"
"io" "io"
cid "github.com/ipfs/go-cid"
xerrors "golang.org/x/xerrors" xerrors "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
@ -12,68 +12,6 @@ import (
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
) )
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
type SealTicket struct {
BlockHeight uint64
TicketBytes []byte
}
func (t *SealTicket) SB() sectorbuilder.SealTicket {
out := sectorbuilder.SealTicket{BlockHeight: t.BlockHeight}
copy(out.TicketBytes[:], t.TicketBytes)
return out
}
type SealSeed struct {
BlockHeight uint64
TicketBytes []byte
}
func (t *SealSeed) SB() sectorbuilder.SealSeed {
out := sectorbuilder.SealSeed{BlockHeight: t.BlockHeight}
copy(out.TicketBytes[:], t.TicketBytes)
return out
}
type Piece struct {
DealID uint64
Size uint64
CommP []byte
}
func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) {
out.Size = p.Size
copy(out.CommP[:], p.CommP)
return out
}
type SectorInfo struct {
State api.SectorState
SectorID uint64
// Packing
Pieces []Piece
// PreCommit
CommC []byte
CommD []byte
CommR []byte
CommRLast []byte
Proof []byte
Ticket SealTicket
PreCommitMessage *cid.Cid
// PreCommitted
Seed SealSeed
// Committing
CommitMessage *cid.Cid
}
type sectorUpdate struct { type sectorUpdate struct {
newState api.SectorState newState api.SectorState
id uint64 id uint64
@ -81,39 +19,40 @@ type sectorUpdate struct {
mut func(*SectorInfo) mut func(*SectorInfo)
} }
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { func (u *sectorUpdate) fatal(err error) *sectorUpdate {
out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces)) return &sectorUpdate{
for i, piece := range t.Pieces { newState: api.FailedUnrecoverable,
out[i] = piece.ppi() id: u.id,
err: err,
mut: u.mut,
} }
return out
} }
func (t *SectorInfo) deals() []uint64 { func (u *sectorUpdate) error(err error) *sectorUpdate {
out := make([]uint64, len(t.Pieces)) return &sectorUpdate{
for i, piece := range t.Pieces { newState: u.newState,
out[i] = piece.DealID id: u.id,
err: err,
mut: u.mut,
} }
return out
} }
func (t *SectorInfo) existingPieces() []uint64 { func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate {
out := make([]uint64, len(t.Pieces)) return &sectorUpdate{
for i, piece := range t.Pieces { newState: u.newState,
out[i] = piece.Size id: u.id,
err: u.err,
mut: m,
} }
return out
} }
func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput { func (u *sectorUpdate) to(newState api.SectorState) *sectorUpdate {
var out sectorbuilder.RawSealPreCommitOutput return &sectorUpdate{
newState: newState,
copy(out.CommC[:], t.CommC) id: u.id,
copy(out.CommD[:], t.CommD) err: u.err,
copy(out.CommR[:], t.CommR) mut: u.mut,
copy(out.CommRLast[:], t.CommRLast) }
return out
} }
func (m *Miner) sectorStateLoop(ctx context.Context) error { func (m *Miner) sectorStateLoop(ctx context.Context) error {
@ -195,9 +134,7 @@ func (m *Miner) onSectorIncoming(sector *SectorInfo) {
} }
if err := m.sectors.Begin(sector.SectorID, sector); err != nil { if err := m.sectors.Begin(sector.SectorID, sector); err != nil {
// We may have re-sent the proposal log.Errorf("sector tracking failed: %s", err)
log.Errorf("deal tracking failed: %s", err)
m.failSector(sector.SectorID, err)
return return
} }
@ -214,10 +151,15 @@ func (m *Miner) onSectorIncoming(sector *SectorInfo) {
} }
func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
log.Infof("Sector %d updated state to %s", update.id, api.SectorStateStr(update.newState)) log.Infof("Sector %d updated state to %s", update.id, api.SectorStates[update.newState])
var sector SectorInfo var sector SectorInfo
err := m.sectors.Mutate(update.id, func(s *SectorInfo) error { err := m.sectors.Mutate(update.id, func(s *SectorInfo) error {
s.State = update.newState s.State = update.newState
s.LastErr = ""
if update.err != nil {
s.LastErr = fmt.Sprintf("%+v", update.err)
}
if update.mut != nil { if update.mut != nil {
update.mut(s) update.mut(s)
} }
@ -225,39 +167,80 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
return nil return nil
}) })
if update.err != nil { if update.err != nil {
log.Errorf("sector %d failed: %s", update.id, update.err) log.Errorf("sector %d failed: %+v", update.id, update.err)
m.failSector(update.id, update.err)
return
} }
if err != nil { if err != nil {
m.failSector(update.id, err) log.Errorf("sector %d error: %+v", update.id, err)
return return
} }
/*
* Empty
| |
| v
*<- Packing <- incoming
| |
| v
*<- Unsealed <--> SealFailed
| |
| v
* PreCommitting <--> PreCommitFailed
| | ^
| v |
*<- PreCommitted ------/
| |
| v v--> SealCommitFailed
*<- Committing
| | ^--> CommitFailed
| v
*<- Proving
|
v
FailedUnrecoverable
UndefinedSectorState <- ¯\_()_/¯
| ^
*---------------------/
*/
switch update.newState { switch update.newState {
// Happy path
case api.Packing: case api.Packing:
m.handleSectorUpdate(ctx, sector, m.finishPacking, api.Unsealed) m.handleSectorUpdate(ctx, sector, m.handlePacking)
case api.Unsealed: case api.Unsealed:
m.handleSectorUpdate(ctx, sector, m.sealPreCommit, api.PreCommitting) m.handleSectorUpdate(ctx, sector, m.handleUnsealed)
case api.PreCommitting: case api.PreCommitting:
m.handleSectorUpdate(ctx, sector, m.preCommit, api.PreCommitted) m.handleSectorUpdate(ctx, sector, m.handlePreCommitting)
case api.PreCommitted: case api.PreCommitted:
m.handleSectorUpdate(ctx, sector, m.preCommitted, api.SectorNoUpdate) m.handleSectorUpdate(ctx, sector, m.handlePreCommitted)
case api.Committing: case api.Committing:
m.handleSectorUpdate(ctx, sector, m.committing, api.Proving) m.handleSectorUpdate(ctx, sector, m.handleCommitting)
case api.Proving: case api.Proving:
// TODO: track sector health / expiration // TODO: track sector health / expiration
log.Infof("Proving sector %d", update.id) log.Infof("Proving sector %d", update.id)
case api.SectorNoUpdate: // noop
// Handled failure modes
case api.SealFailed:
log.Warn("sector %d entered unimplemented state 'SealFailed'", update.id)
case api.PreCommitFailed:
log.Warn("sector %d entered unimplemented state 'PreCommitFailed'", update.id)
case api.SealCommitFailed:
log.Warn("sector %d entered unimplemented state 'SealCommitFailed'", update.id)
case api.CommitFailed:
log.Warn("sector %d entered unimplemented state 'CommitFailed'", update.id)
// Fatal errors
case api.UndefinedSectorState:
log.Error("sector update with undefined state!")
case api.FailedUnrecoverable:
log.Errorf("sector %d failed unrecoverably", update.id)
default: default:
log.Errorf("unexpected sector update state: %d", update.newState) log.Errorf("unexpected sector update state: %d", update.newState)
} }
} }
func (m *Miner) failSector(id uint64, err error) {
log.Errorf("sector %d error: %+v", id, err)
}
func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) { func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
if padreader.PaddedSize(size) != size { if padreader.PaddedSize(size) != size {
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")