sealing: Improve sector log
This commit is contained in:
parent
80c774ec49
commit
2fc1114ad2
12
fsm.go
12
fsm.go
@ -2,6 +2,7 @@ package sealing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
@ -82,10 +83,17 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
/////
|
||||
// First process all events
|
||||
|
||||
|
||||
for _, event := range events {
|
||||
e, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
log.Errorf("marshaling event for logging: %+v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
l := Log{
|
||||
Timestamp: uint64(time.Now().Unix()),
|
||||
Message: fmt.Sprintf("%s", event),
|
||||
Message: string(e),
|
||||
Kind: fmt.Sprintf("event;%T", event.User),
|
||||
}
|
||||
|
||||
@ -201,7 +209,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
|
||||
e.apply(state)
|
||||
state.State = api.CommitWait
|
||||
case SectorSeedReady: // seed changed :/
|
||||
if e.seed.Equals(&state.Seed) {
|
||||
if e.Seed.Equals(&state.Seed) {
|
||||
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
|
||||
continue // or it didn't!
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package sealing
|
||||
import (
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
)
|
||||
@ -25,6 +26,7 @@ type SectorRestart struct{}
|
||||
func (evt SectorRestart) applyGlobal(*SectorInfo) bool { return false }
|
||||
|
||||
type SectorFatalError struct{ error }
|
||||
func (evt SectorFatalError) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||
|
||||
func (evt SectorFatalError) applyGlobal(state *SectorInfo) bool {
|
||||
log.Errorf("Fatal error on sector %d: %+v", state.SectorID, evt.error)
|
||||
@ -35,32 +37,32 @@ func (evt SectorFatalError) applyGlobal(state *SectorInfo) bool {
|
||||
}
|
||||
|
||||
type SectorForceState struct {
|
||||
state api.SectorState
|
||||
State api.SectorState
|
||||
}
|
||||
|
||||
func (evt SectorForceState) applyGlobal(state *SectorInfo) bool {
|
||||
state.State = evt.state
|
||||
state.State = evt.State
|
||||
return true
|
||||
}
|
||||
|
||||
// Normal path
|
||||
|
||||
type SectorStart struct {
|
||||
id abi.SectorNumber
|
||||
sectorType abi.RegisteredProof
|
||||
pieces []Piece
|
||||
ID abi.SectorNumber
|
||||
SectorType abi.RegisteredProof
|
||||
Pieces []Piece
|
||||
}
|
||||
|
||||
func (evt SectorStart) apply(state *SectorInfo) {
|
||||
state.SectorID = evt.id
|
||||
state.Pieces = evt.pieces
|
||||
state.SectorType = evt.sectorType
|
||||
state.SectorID = evt.ID
|
||||
state.Pieces = evt.Pieces
|
||||
state.SectorType = evt.SectorType
|
||||
}
|
||||
|
||||
type SectorPacked struct{ pieces []Piece }
|
||||
type SectorPacked struct{ Pieces []Piece }
|
||||
|
||||
func (evt SectorPacked) apply(state *SectorInfo) {
|
||||
state.Pieces = append(state.Pieces, evt.pieces...)
|
||||
state.Pieces = append(state.Pieces, evt.Pieces...)
|
||||
}
|
||||
|
||||
type SectorPackingFailed struct{ error }
|
||||
@ -68,57 +70,59 @@ type SectorPackingFailed struct{ error }
|
||||
func (evt SectorPackingFailed) apply(*SectorInfo) {}
|
||||
|
||||
type SectorSealed struct {
|
||||
commR cid.Cid
|
||||
commD cid.Cid
|
||||
ticket api.SealTicket
|
||||
Sealed cid.Cid
|
||||
Unsealed cid.Cid
|
||||
Ticket api.SealTicket
|
||||
}
|
||||
|
||||
func (evt SectorSealed) apply(state *SectorInfo) {
|
||||
commd := evt.commD
|
||||
commd := evt.Unsealed
|
||||
state.CommD = &commd
|
||||
commr := evt.commR
|
||||
commr := evt.Sealed
|
||||
state.CommR = &commr
|
||||
state.Ticket = evt.ticket
|
||||
state.Ticket = evt.Ticket
|
||||
}
|
||||
|
||||
type SectorSealFailed struct{ error }
|
||||
|
||||
func (evt SectorSealFailed) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||
func (evt SectorSealFailed) apply(*SectorInfo) {}
|
||||
|
||||
type SectorPreCommitFailed struct{ error }
|
||||
|
||||
func (evt SectorPreCommitFailed) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||
func (evt SectorPreCommitFailed) apply(*SectorInfo) {}
|
||||
|
||||
type SectorPreCommitted struct {
|
||||
message cid.Cid
|
||||
Message cid.Cid
|
||||
}
|
||||
|
||||
func (evt SectorPreCommitted) apply(state *SectorInfo) {
|
||||
state.PreCommitMessage = &evt.message
|
||||
state.PreCommitMessage = &evt.Message
|
||||
}
|
||||
|
||||
type SectorSeedReady struct {
|
||||
seed api.SealSeed
|
||||
Seed api.SealSeed
|
||||
}
|
||||
|
||||
func (evt SectorSeedReady) apply(state *SectorInfo) {
|
||||
state.Seed = evt.seed
|
||||
state.Seed = evt.Seed
|
||||
}
|
||||
|
||||
type SectorComputeProofFailed struct{ error }
|
||||
func (evt SectorComputeProofFailed) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||
func (evt SectorComputeProofFailed) apply(*SectorInfo) {}
|
||||
|
||||
type SectorCommitFailed struct{ error }
|
||||
|
||||
func (evt SectorCommitFailed) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||
func (evt SectorCommitFailed) apply(*SectorInfo) {}
|
||||
|
||||
type SectorCommitted struct {
|
||||
message cid.Cid
|
||||
proof []byte
|
||||
Message cid.Cid
|
||||
Proof []byte
|
||||
}
|
||||
|
||||
func (evt SectorCommitted) apply(state *SectorInfo) {
|
||||
state.Proof = evt.proof
|
||||
state.CommitMessage = &evt.message
|
||||
state.Proof = evt.Proof
|
||||
state.CommitMessage = &evt.Message
|
||||
}
|
||||
|
||||
type SectorProving struct{}
|
||||
@ -130,7 +134,7 @@ type SectorFinalized struct{}
|
||||
func (evt SectorFinalized) apply(*SectorInfo) {}
|
||||
|
||||
type SectorFinalizeFailed struct{ error }
|
||||
|
||||
func (evt SectorFinalizeFailed) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||
func (evt SectorFinalizeFailed) apply(*SectorInfo) {}
|
||||
|
||||
// Failed state recovery
|
||||
|
@ -76,12 +76,12 @@ func TestSeedRevert(t *testing.T) {
|
||||
m.planSingle(SectorSeedReady{})
|
||||
require.Equal(m.t, m.state.State, api.Committing)
|
||||
|
||||
_, err := m.s.plan([]statemachine.Event{{SectorSeedReady{seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state)
|
||||
_, err := m.s.plan([]statemachine.Event{{SectorSeedReady{Seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state)
|
||||
require.NoError(t, err)
|
||||
require.Equal(m.t, m.state.State, api.Committing)
|
||||
|
||||
// not changing the seed this time
|
||||
_, err = m.s.plan([]statemachine.Event{{SectorSeedReady{seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state)
|
||||
_, err = m.s.plan([]statemachine.Event{{SectorSeedReady{Seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state)
|
||||
require.Equal(m.t, m.state.State, api.CommitWait)
|
||||
|
||||
m.planSingle(SectorProving{})
|
||||
|
@ -150,9 +150,9 @@ func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r i
|
||||
func (m *Sealing) newSector(sid abi.SectorNumber, rt abi.RegisteredProof, pieces []Piece) error {
|
||||
log.Infof("Start sealing %d", sid)
|
||||
return m.sectors.Send(uint64(sid), SectorStart{
|
||||
id: sid,
|
||||
pieces: pieces,
|
||||
sectorType: rt,
|
||||
ID: sid,
|
||||
Pieces: pieces,
|
||||
SectorType: rt,
|
||||
})
|
||||
}
|
||||
|
||||
|
16
states.go
16
states.go
@ -46,7 +46,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
|
||||
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
|
||||
}
|
||||
|
||||
return ctx.Send(SectorPacked{pieces: pieces})
|
||||
return ctx.Send(SectorPacked{Pieces: pieces})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
|
||||
@ -81,9 +81,9 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
|
||||
}
|
||||
|
||||
return ctx.Send(SectorSealed{
|
||||
commD: cids.Unsealed,
|
||||
commR: cids.Sealed,
|
||||
ticket: *ticket,
|
||||
Unsealed: cids.Unsealed,
|
||||
Sealed: cids.Sealed,
|
||||
Ticket: *ticket,
|
||||
})
|
||||
}
|
||||
|
||||
@ -132,7 +132,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
|
||||
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorPreCommitted{message: smsg.Cid()})
|
||||
return ctx.Send(SectorPreCommitted{Message: smsg.Cid()})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error {
|
||||
@ -162,7 +162,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
|
||||
return err
|
||||
}
|
||||
|
||||
ctx.Send(SectorSeedReady{seed: api.SealSeed{
|
||||
ctx.Send(SectorSeedReady{Seed: api.SealSeed{
|
||||
Epoch: randHeight,
|
||||
Value: abi.InteractiveSealRandomness(rand),
|
||||
}})
|
||||
@ -229,8 +229,8 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
|
||||
}
|
||||
|
||||
return ctx.Send(SectorCommitted{
|
||||
proof: proof,
|
||||
message: smsg.Cid(),
|
||||
Proof: proof,
|
||||
Message: smsg.Cid(),
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user