Merge pull request #792 from filecoin-project/fix/precommit-cancel-commit

Handle reverts in precommit more correctly
This commit is contained in:
Łukasz Magiera 2019-12-09 15:53:25 +01:00 committed by GitHub
commit d18179d347
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 74 additions and 36 deletions

View File

@ -87,6 +87,8 @@ type SectorInfo struct {
Deals []uint64 Deals []uint64
Ticket sectorbuilder.SealTicket Ticket sectorbuilder.SealTicket
Seed sectorbuilder.SealSeed Seed sectorbuilder.SealSeed
Retries uint64
LastErr string LastErr string
} }

View File

@ -26,6 +26,9 @@ const SlashablePowerDelay = 20
// Epochs // Epochs
const InteractivePoRepDelay = 2 const InteractivePoRepDelay = 2
// Epochs
const InteractivePoRepConfidence = 6
func init() { func init() {
os.Setenv("TRUST_PARAMS", "1") os.Setenv("TRUST_PARAMS", "1")
} }

View File

@ -28,3 +28,6 @@ const SlashablePowerDelay = 200
// Epochs // Epochs
const InteractivePoRepDelay = 8 const InteractivePoRepDelay = 8
// Epochs
const InteractivePoRepConfidence = 6

View File

@ -5,7 +5,7 @@ import (
"io" "io"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
cid "github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors" xerrors "golang.org/x/xerrors"
) )

View File

@ -5,7 +5,7 @@ import (
"io" "io"
"math" "math"
cid "github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors" xerrors "golang.org/x/xerrors"
) )

View File

@ -73,6 +73,7 @@ var sectorsStatusCmd = &cli.Command{
fmt.Printf("SeedH:\t\t%d\n", status.Seed.BlockHeight) fmt.Printf("SeedH:\t\t%d\n", status.Seed.BlockHeight)
fmt.Printf("Proof:\t\t%x\n", status.Proof) fmt.Printf("Proof:\t\t%x\n", status.Proof)
fmt.Printf("Deals:\t\t%v\n", status.Deals) fmt.Printf("Deals:\t\t%v\n", status.Deals)
fmt.Printf("Retries:\t\t%d\n", status.Retries)
if status.LastErr != "" { if status.LastErr != "" {
fmt.Printf("Last Error:\t\t%s\n", status.LastErr) fmt.Printf("Last Error:\t\t%s\n", status.LastErr)
} }

View File

@ -167,6 +167,7 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid uint64) (api.S
Deals: deals, Deals: deals,
Ticket: info.Ticket.SB(), Ticket: info.Ticket.SB(),
Seed: info.Seed.SB(), Seed: info.Seed.SB(),
Retries: info.Nonce,
LastErr: info.LastErr, LastErr: info.LastErr,
}, nil }, nil
@ -203,7 +204,7 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
} }
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id uint64, state api.SectorState) error { func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id uint64, state api.SectorState) error {
return sm.Miner.UpdateSectorState(ctx, id, state) return sm.Miner.UpdateSectorState(ctx, id, storage.NonceIncrement, state)
} }
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) { func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {

View File

@ -239,7 +239,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull) _, err := w.Write(cbg.CborNull)
return err return err
} }
if _, err := w.Write([]byte{141}); err != nil { if _, err := w.Write([]byte{142}); err != nil {
return err return err
} }
@ -253,6 +253,11 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
return err return err
} }
// t.t.Nonce (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Nonce))); err != nil {
return err
}
// t.t.Pieces ([]storage.Piece) (slice) // t.t.Pieces ([]storage.Piece) (slice)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Pieces)))); err != nil { if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Pieces)))); err != nil {
return err return err
@ -358,7 +363,7 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array") return fmt.Errorf("cbor input should be of type array")
} }
if extra != 13 { if extra != 14 {
return fmt.Errorf("cbor input had wrong number of fields") return fmt.Errorf("cbor input had wrong number of fields")
} }
@ -382,6 +387,16 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("wrong type for uint64 field") return fmt.Errorf("wrong type for uint64 field")
} }
t.SectorID = uint64(extra) t.SectorID = uint64(extra)
// t.t.Nonce (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Nonce = uint64(extra)
// t.t.Pieces ([]storage.Piece) (slice) // t.t.Pieces ([]storage.Piece) (slice)
maj, extra, err = cbg.CborReadHeader(br) maj, extra, err = cbg.CborReadHeader(br)

View File

@ -137,6 +137,8 @@ func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sect
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied
log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight) log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight)
updateNonce := sector.Nonce
err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error { err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error {
rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight)) rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight))
if err != nil { if err != nil {
@ -146,19 +148,21 @@ func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sect
return err return err
} }
m.sectorUpdated <- *sector.upd().to(api.Committing).state(func(info *SectorInfo) { m.sectorUpdated <- *sector.upd().to(api.Committing).setNonce(updateNonce).state(func(info *SectorInfo) {
info.Seed = SealSeed{ info.Seed = SealSeed{
BlockHeight: randHeight, BlockHeight: randHeight,
TicketBytes: rand, TicketBytes: rand,
} }
}) })
updateNonce++
return nil return nil
}, func(ctx context.Context, ts *types.TipSet) error { }, func(ctx context.Context, ts *types.TipSet) error {
log.Warn("revert in interactive commit sector step") log.Warn("revert in interactive commit sector step")
// TODO: need to cancel running process and restart... // TODO: need to cancel running process and restart...
return nil return nil
}, 3, mw.TipSet.Height()+build.InteractivePoRepDelay) }, build.InteractivePoRepConfidence, mw.TipSet.Height()+build.InteractivePoRepDelay)
if err != nil { if err != nil {
log.Warn("waitForPreCommitMessage ChainAt errored: ", err) log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
} }

View File

@ -49,6 +49,7 @@ func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) {
type SectorInfo struct { type SectorInfo struct {
State api.SectorState State api.SectorState
SectorID uint64 SectorID uint64
Nonce uint64
// Packing // Packing
@ -75,7 +76,7 @@ type SectorInfo struct {
} }
func (t *SectorInfo) upd() *sectorUpdate { func (t *SectorInfo) upd() *sectorUpdate {
return &sectorUpdate{id: t.SectorID} return &sectorUpdate{id: t.SectorID, nonce: t.Nonce}
} }
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"math"
xerrors "golang.org/x/xerrors" xerrors "golang.org/x/xerrors"
@ -12,53 +13,47 @@ import (
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
) )
const NonceIncrement = math.MaxUint64
type sectorUpdate struct { type sectorUpdate struct {
newState api.SectorState newState api.SectorState
id uint64 id uint64
err error err error
nonce uint64
mut func(*SectorInfo) mut func(*SectorInfo)
} }
func (u *sectorUpdate) fatal(err error) *sectorUpdate { func (u *sectorUpdate) fatal(err error) *sectorUpdate {
return &sectorUpdate{ u.newState = api.FailedUnrecoverable
newState: api.FailedUnrecoverable, u.err = err
id: u.id, return u
err: err,
mut: u.mut,
}
} }
func (u *sectorUpdate) error(err error) *sectorUpdate { func (u *sectorUpdate) error(err error) *sectorUpdate {
return &sectorUpdate{ u.err = err
newState: u.newState, return u
id: u.id,
err: err,
mut: u.mut,
}
} }
func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate { func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate {
return &sectorUpdate{ u.mut = m
newState: u.newState, return u
id: u.id,
err: u.err,
mut: m,
}
} }
func (u *sectorUpdate) to(newState api.SectorState) *sectorUpdate { func (u *sectorUpdate) to(newState api.SectorState) *sectorUpdate {
return &sectorUpdate{ u.newState = newState
newState: newState, return u
id: u.id,
err: u.err,
mut: u.mut,
}
} }
func (m *Miner) UpdateSectorState(ctx context.Context, sector uint64, state api.SectorState) error { func (u *sectorUpdate) setNonce(nc uint64) *sectorUpdate {
u.nonce = nc
return u
}
func (m *Miner) UpdateSectorState(ctx context.Context, sector uint64, snonce uint64, state api.SectorState) error {
select { select {
case m.sectorUpdated <- sectorUpdate{ case m.sectorUpdated <- sectorUpdate{
newState: state, newState: state,
nonce: snonce,
id: sector, id: sector,
}: }:
return nil return nil
@ -78,6 +73,7 @@ func (m *Miner) sectorStateLoop(ctx context.Context) error {
select { select {
case m.sectorUpdated <- sectorUpdate{ case m.sectorUpdated <- sectorUpdate{
newState: si.State, newState: si.State,
nonce: si.Nonce,
id: si.SectorID, id: si.SectorID,
err: nil, err: nil,
mut: nil, mut: nil,
@ -166,6 +162,15 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
log.Infof("Sector %d updated state to %s", update.id, api.SectorStates[update.newState]) log.Infof("Sector %d updated state to %s", update.id, api.SectorStates[update.newState])
var sector SectorInfo var sector SectorInfo
err := m.sectors.Mutate(update.id, func(s *SectorInfo) error { err := m.sectors.Mutate(update.id, func(s *SectorInfo) error {
if update.nonce < s.Nonce {
return xerrors.Errorf("update nonce too low, ignoring (%d < %d)", update.nonce, s.Nonce)
}
if update.nonce != NonceIncrement {
s.Nonce = update.nonce
} else {
s.Nonce++ // forced update
}
s.State = update.newState s.State = update.newState
if update.err != nil { if update.err != nil {
if s.LastErr != "" { if s.LastErr != "" {
@ -184,7 +189,7 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
log.Errorf("sector %d failed: %+v", update.id, update.err) log.Errorf("sector %d failed: %+v", update.id, update.err)
} }
if err != nil { if err != nil {
log.Errorf("sector %d error: %+v", update.id, err) log.Errorf("sector %d update error: %+v", update.id, err)
return return
} }
@ -203,10 +208,13 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
| | ^ | | ^
| v | | v |
*<- PreCommitted ------/ *<- PreCommitted ------/
| | | |||
| v v--> SealCommitFailed | vvv v--> SealCommitFailed
*<- Committing *<- Committing
| | ^--> CommitFailed | | ^--> CommitFailed
| v ^
*<- CommitWait ---/
| |
| v | v
*<- Proving *<- Proving
| |