sealing: Improve sector log
This commit is contained in:
parent
971fe6fdfd
commit
66d71d9974
@ -2,6 +2,7 @@ package sealing
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
@ -82,10 +83,17 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
/////
|
/////
|
||||||
// First process all events
|
// First process all events
|
||||||
|
|
||||||
|
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
|
e, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("marshaling event for logging: %+v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
l := Log{
|
l := Log{
|
||||||
Timestamp: uint64(time.Now().Unix()),
|
Timestamp: uint64(time.Now().Unix()),
|
||||||
Message: fmt.Sprintf("%s", event),
|
Message: string(e),
|
||||||
Kind: fmt.Sprintf("event;%T", event.User),
|
Kind: fmt.Sprintf("event;%T", event.User),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -201,7 +209,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
|
|||||||
e.apply(state)
|
e.apply(state)
|
||||||
state.State = api.CommitWait
|
state.State = api.CommitWait
|
||||||
case SectorSeedReady: // seed changed :/
|
case SectorSeedReady: // seed changed :/
|
||||||
if e.seed.Equals(&state.Seed) {
|
if e.Seed.Equals(&state.Seed) {
|
||||||
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
|
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
|
||||||
continue // or it didn't!
|
continue // or it didn't!
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package sealing
|
|||||||
import (
|
import (
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
)
|
)
|
||||||
@ -25,6 +26,7 @@ type SectorRestart struct{}
|
|||||||
func (evt SectorRestart) applyGlobal(*SectorInfo) bool { return false }
|
func (evt SectorRestart) applyGlobal(*SectorInfo) bool { return false }
|
||||||
|
|
||||||
type SectorFatalError struct{ error }
|
type SectorFatalError struct{ error }
|
||||||
|
func (evt SectorFatalError) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||||
|
|
||||||
func (evt SectorFatalError) applyGlobal(state *SectorInfo) bool {
|
func (evt SectorFatalError) applyGlobal(state *SectorInfo) bool {
|
||||||
log.Errorf("Fatal error on sector %d: %+v", state.SectorID, evt.error)
|
log.Errorf("Fatal error on sector %d: %+v", state.SectorID, evt.error)
|
||||||
@ -35,32 +37,32 @@ func (evt SectorFatalError) applyGlobal(state *SectorInfo) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type SectorForceState struct {
|
type SectorForceState struct {
|
||||||
state api.SectorState
|
State api.SectorState
|
||||||
}
|
}
|
||||||
|
|
||||||
func (evt SectorForceState) applyGlobal(state *SectorInfo) bool {
|
func (evt SectorForceState) applyGlobal(state *SectorInfo) bool {
|
||||||
state.State = evt.state
|
state.State = evt.State
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Normal path
|
// Normal path
|
||||||
|
|
||||||
type SectorStart struct {
|
type SectorStart struct {
|
||||||
id abi.SectorNumber
|
ID abi.SectorNumber
|
||||||
sectorType abi.RegisteredProof
|
SectorType abi.RegisteredProof
|
||||||
pieces []Piece
|
Pieces []Piece
|
||||||
}
|
}
|
||||||
|
|
||||||
func (evt SectorStart) apply(state *SectorInfo) {
|
func (evt SectorStart) apply(state *SectorInfo) {
|
||||||
state.SectorID = evt.id
|
state.SectorID = evt.ID
|
||||||
state.Pieces = evt.pieces
|
state.Pieces = evt.Pieces
|
||||||
state.SectorType = evt.sectorType
|
state.SectorType = evt.SectorType
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorPacked struct{ pieces []Piece }
|
type SectorPacked struct{ Pieces []Piece }
|
||||||
|
|
||||||
func (evt SectorPacked) apply(state *SectorInfo) {
|
func (evt SectorPacked) apply(state *SectorInfo) {
|
||||||
state.Pieces = append(state.Pieces, evt.pieces...)
|
state.Pieces = append(state.Pieces, evt.Pieces...)
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorPackingFailed struct{ error }
|
type SectorPackingFailed struct{ error }
|
||||||
@ -68,57 +70,59 @@ type SectorPackingFailed struct{ error }
|
|||||||
func (evt SectorPackingFailed) apply(*SectorInfo) {}
|
func (evt SectorPackingFailed) apply(*SectorInfo) {}
|
||||||
|
|
||||||
type SectorSealed struct {
|
type SectorSealed struct {
|
||||||
commR cid.Cid
|
Sealed cid.Cid
|
||||||
commD cid.Cid
|
Unsealed cid.Cid
|
||||||
ticket api.SealTicket
|
Ticket api.SealTicket
|
||||||
}
|
}
|
||||||
|
|
||||||
func (evt SectorSealed) apply(state *SectorInfo) {
|
func (evt SectorSealed) apply(state *SectorInfo) {
|
||||||
commd := evt.commD
|
commd := evt.Unsealed
|
||||||
state.CommD = &commd
|
state.CommD = &commd
|
||||||
commr := evt.commR
|
commr := evt.Sealed
|
||||||
state.CommR = &commr
|
state.CommR = &commr
|
||||||
state.Ticket = evt.ticket
|
state.Ticket = evt.Ticket
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorSealFailed struct{ error }
|
type SectorSealFailed struct{ error }
|
||||||
|
func (evt SectorSealFailed) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||||
func (evt SectorSealFailed) apply(*SectorInfo) {}
|
func (evt SectorSealFailed) apply(*SectorInfo) {}
|
||||||
|
|
||||||
type SectorPreCommitFailed struct{ error }
|
type SectorPreCommitFailed struct{ error }
|
||||||
|
func (evt SectorPreCommitFailed) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||||
func (evt SectorPreCommitFailed) apply(*SectorInfo) {}
|
func (evt SectorPreCommitFailed) apply(*SectorInfo) {}
|
||||||
|
|
||||||
type SectorPreCommitted struct {
|
type SectorPreCommitted struct {
|
||||||
message cid.Cid
|
Message cid.Cid
|
||||||
}
|
}
|
||||||
|
|
||||||
func (evt SectorPreCommitted) apply(state *SectorInfo) {
|
func (evt SectorPreCommitted) apply(state *SectorInfo) {
|
||||||
state.PreCommitMessage = &evt.message
|
state.PreCommitMessage = &evt.Message
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorSeedReady struct {
|
type SectorSeedReady struct {
|
||||||
seed api.SealSeed
|
Seed api.SealSeed
|
||||||
}
|
}
|
||||||
|
|
||||||
func (evt SectorSeedReady) apply(state *SectorInfo) {
|
func (evt SectorSeedReady) apply(state *SectorInfo) {
|
||||||
state.Seed = evt.seed
|
state.Seed = evt.Seed
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorComputeProofFailed struct{ error }
|
type SectorComputeProofFailed struct{ error }
|
||||||
|
func (evt SectorComputeProofFailed) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||||
|
func (evt SectorComputeProofFailed) apply(*SectorInfo) {}
|
||||||
|
|
||||||
type SectorCommitFailed struct{ error }
|
type SectorCommitFailed struct{ error }
|
||||||
|
func (evt SectorCommitFailed) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||||
func (evt SectorCommitFailed) apply(*SectorInfo) {}
|
func (evt SectorCommitFailed) apply(*SectorInfo) {}
|
||||||
|
|
||||||
type SectorCommitted struct {
|
type SectorCommitted struct {
|
||||||
message cid.Cid
|
Message cid.Cid
|
||||||
proof []byte
|
Proof []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (evt SectorCommitted) apply(state *SectorInfo) {
|
func (evt SectorCommitted) apply(state *SectorInfo) {
|
||||||
state.Proof = evt.proof
|
state.Proof = evt.Proof
|
||||||
state.CommitMessage = &evt.message
|
state.CommitMessage = &evt.Message
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorProving struct{}
|
type SectorProving struct{}
|
||||||
@ -130,7 +134,7 @@ type SectorFinalized struct{}
|
|||||||
func (evt SectorFinalized) apply(*SectorInfo) {}
|
func (evt SectorFinalized) apply(*SectorInfo) {}
|
||||||
|
|
||||||
type SectorFinalizeFailed struct{ error }
|
type SectorFinalizeFailed struct{ error }
|
||||||
|
func (evt SectorFinalizeFailed) FormatError(xerrors.Printer) (next error) {return evt.error}
|
||||||
func (evt SectorFinalizeFailed) apply(*SectorInfo) {}
|
func (evt SectorFinalizeFailed) apply(*SectorInfo) {}
|
||||||
|
|
||||||
// Failed state recovery
|
// Failed state recovery
|
||||||
|
@ -76,12 +76,12 @@ func TestSeedRevert(t *testing.T) {
|
|||||||
m.planSingle(SectorSeedReady{})
|
m.planSingle(SectorSeedReady{})
|
||||||
require.Equal(m.t, m.state.State, api.Committing)
|
require.Equal(m.t, m.state.State, api.Committing)
|
||||||
|
|
||||||
_, err := m.s.plan([]statemachine.Event{{SectorSeedReady{seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state)
|
_, err := m.s.plan([]statemachine.Event{{SectorSeedReady{Seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(m.t, m.state.State, api.Committing)
|
require.Equal(m.t, m.state.State, api.Committing)
|
||||||
|
|
||||||
// not changing the seed this time
|
// not changing the seed this time
|
||||||
_, err = m.s.plan([]statemachine.Event{{SectorSeedReady{seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state)
|
_, err = m.s.plan([]statemachine.Event{{SectorSeedReady{Seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state)
|
||||||
require.Equal(m.t, m.state.State, api.CommitWait)
|
require.Equal(m.t, m.state.State, api.CommitWait)
|
||||||
|
|
||||||
m.planSingle(SectorProving{})
|
m.planSingle(SectorProving{})
|
||||||
|
@ -150,9 +150,9 @@ func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r i
|
|||||||
func (m *Sealing) newSector(sid abi.SectorNumber, rt abi.RegisteredProof, pieces []Piece) error {
|
func (m *Sealing) newSector(sid abi.SectorNumber, rt abi.RegisteredProof, pieces []Piece) error {
|
||||||
log.Infof("Start sealing %d", sid)
|
log.Infof("Start sealing %d", sid)
|
||||||
return m.sectors.Send(uint64(sid), SectorStart{
|
return m.sectors.Send(uint64(sid), SectorStart{
|
||||||
id: sid,
|
ID: sid,
|
||||||
pieces: pieces,
|
Pieces: pieces,
|
||||||
sectorType: rt,
|
SectorType: rt,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,7 +46,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
|
|||||||
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
|
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ctx.Send(SectorPacked{pieces: pieces})
|
return ctx.Send(SectorPacked{Pieces: pieces})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
@ -81,9 +81,9 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
return ctx.Send(SectorSealed{
|
return ctx.Send(SectorSealed{
|
||||||
commD: cids.Unsealed,
|
Unsealed: cids.Unsealed,
|
||||||
commR: cids.Sealed,
|
Sealed: cids.Sealed,
|
||||||
ticket: *ticket,
|
Ticket: *ticket,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -132,7 +132,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
|
|||||||
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
return ctx.Send(SectorPreCommitted{message: smsg.Cid()})
|
return ctx.Send(SectorPreCommitted{Message: smsg.Cid()})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
@ -162,7 +162,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.Send(SectorSeedReady{seed: api.SealSeed{
|
ctx.Send(SectorSeedReady{Seed: api.SealSeed{
|
||||||
Epoch: randHeight,
|
Epoch: randHeight,
|
||||||
Value: abi.InteractiveSealRandomness(rand),
|
Value: abi.InteractiveSealRandomness(rand),
|
||||||
}})
|
}})
|
||||||
@ -229,8 +229,8 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
|
|||||||
}
|
}
|
||||||
|
|
||||||
return ctx.Send(SectorCommitted{
|
return ctx.Send(SectorCommitted{
|
||||||
proof: proof,
|
Proof: proof,
|
||||||
message: smsg.Cid(),
|
Message: smsg.Cid(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user