From 66d71d9974ebd20a4b868c7b10fb209ac2cb36c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 22 Mar 2020 21:44:27 +0100 Subject: [PATCH] sealing: Improve sector log --- storage/sealing/fsm.go | 12 +++++-- storage/sealing/fsm_events.go | 60 +++++++++++++++++++---------------- storage/sealing/fsm_test.go | 4 +-- storage/sealing/sealing.go | 6 ++-- storage/sealing/states.go | 16 +++++----- 5 files changed, 55 insertions(+), 43 deletions(-) diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index bd3031aca..89d27178c 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -2,6 +2,7 @@ package sealing import ( "context" + "encoding/json" "fmt" "reflect" "time" @@ -82,10 +83,17 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta ///// // First process all events + for _, event := range events { + e, err := json.Marshal(event) + if err != nil { + log.Errorf("marshaling event for logging: %+v", err) + continue + } + l := Log{ Timestamp: uint64(time.Now().Unix()), - Message: fmt.Sprintf("%s", event), + Message: string(e), Kind: fmt.Sprintf("event;%T", event.User), } @@ -201,7 +209,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error { e.apply(state) state.State = api.CommitWait case SectorSeedReady: // seed changed :/ - if e.seed.Equals(&state.Seed) { + if e.Seed.Equals(&state.Seed) { log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change") continue // or it didn't! } diff --git a/storage/sealing/fsm_events.go b/storage/sealing/fsm_events.go index e809685eb..d358ba360 100644 --- a/storage/sealing/fsm_events.go +++ b/storage/sealing/fsm_events.go @@ -3,6 +3,7 @@ package sealing import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/ipfs/go-cid" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" ) @@ -25,6 +26,7 @@ type SectorRestart struct{} func (evt SectorRestart) applyGlobal(*SectorInfo) bool { return false } type SectorFatalError struct{ error } +func (evt SectorFatalError) FormatError(xerrors.Printer) (next error) {return evt.error} func (evt SectorFatalError) applyGlobal(state *SectorInfo) bool { log.Errorf("Fatal error on sector %d: %+v", state.SectorID, evt.error) @@ -35,32 +37,32 @@ func (evt SectorFatalError) applyGlobal(state *SectorInfo) bool { } type SectorForceState struct { - state api.SectorState + State api.SectorState } func (evt SectorForceState) applyGlobal(state *SectorInfo) bool { - state.State = evt.state + state.State = evt.State return true } // Normal path type SectorStart struct { - id abi.SectorNumber - sectorType abi.RegisteredProof - pieces []Piece + ID abi.SectorNumber + SectorType abi.RegisteredProof + Pieces []Piece } func (evt SectorStart) apply(state *SectorInfo) { - state.SectorID = evt.id - state.Pieces = evt.pieces - state.SectorType = evt.sectorType + state.SectorID = evt.ID + state.Pieces = evt.Pieces + state.SectorType = evt.SectorType } -type SectorPacked struct{ pieces []Piece } +type SectorPacked struct{ Pieces []Piece } func (evt SectorPacked) apply(state *SectorInfo) { - state.Pieces = append(state.Pieces, evt.pieces...) + state.Pieces = append(state.Pieces, evt.Pieces...) } type SectorPackingFailed struct{ error } @@ -68,57 +70,59 @@ type SectorPackingFailed struct{ error } func (evt SectorPackingFailed) apply(*SectorInfo) {} type SectorSealed struct { - commR cid.Cid - commD cid.Cid - ticket api.SealTicket + Sealed cid.Cid + Unsealed cid.Cid + Ticket api.SealTicket } func (evt SectorSealed) apply(state *SectorInfo) { - commd := evt.commD + commd := evt.Unsealed state.CommD = &commd - commr := evt.commR + commr := evt.Sealed state.CommR = &commr - state.Ticket = evt.ticket + state.Ticket = evt.Ticket } type SectorSealFailed struct{ error } - +func (evt SectorSealFailed) FormatError(xerrors.Printer) (next error) {return evt.error} func (evt SectorSealFailed) apply(*SectorInfo) {} type SectorPreCommitFailed struct{ error } - +func (evt SectorPreCommitFailed) FormatError(xerrors.Printer) (next error) {return evt.error} func (evt SectorPreCommitFailed) apply(*SectorInfo) {} type SectorPreCommitted struct { - message cid.Cid + Message cid.Cid } func (evt SectorPreCommitted) apply(state *SectorInfo) { - state.PreCommitMessage = &evt.message + state.PreCommitMessage = &evt.Message } type SectorSeedReady struct { - seed api.SealSeed + Seed api.SealSeed } func (evt SectorSeedReady) apply(state *SectorInfo) { - state.Seed = evt.seed + state.Seed = evt.Seed } type SectorComputeProofFailed struct{ error } +func (evt SectorComputeProofFailed) FormatError(xerrors.Printer) (next error) {return evt.error} +func (evt SectorComputeProofFailed) apply(*SectorInfo) {} type SectorCommitFailed struct{ error } - +func (evt SectorCommitFailed) FormatError(xerrors.Printer) (next error) {return evt.error} func (evt SectorCommitFailed) apply(*SectorInfo) {} type SectorCommitted struct { - message cid.Cid - proof []byte + Message cid.Cid + Proof []byte } func (evt SectorCommitted) apply(state *SectorInfo) { - state.Proof = evt.proof - state.CommitMessage = &evt.message + state.Proof = evt.Proof + state.CommitMessage = &evt.Message } type SectorProving struct{} @@ -130,7 +134,7 @@ type SectorFinalized struct{} func (evt SectorFinalized) apply(*SectorInfo) {} type SectorFinalizeFailed struct{ error } - +func (evt SectorFinalizeFailed) FormatError(xerrors.Printer) (next error) {return evt.error} func (evt SectorFinalizeFailed) apply(*SectorInfo) {} // Failed state recovery diff --git a/storage/sealing/fsm_test.go b/storage/sealing/fsm_test.go index 41becc4f3..9b1a5d3e8 100644 --- a/storage/sealing/fsm_test.go +++ b/storage/sealing/fsm_test.go @@ -76,12 +76,12 @@ func TestSeedRevert(t *testing.T) { m.planSingle(SectorSeedReady{}) require.Equal(m.t, m.state.State, api.Committing) - _, err := m.s.plan([]statemachine.Event{{SectorSeedReady{seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state) + _, err := m.s.plan([]statemachine.Event{{SectorSeedReady{Seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state) require.NoError(t, err) require.Equal(m.t, m.state.State, api.Committing) // not changing the seed this time - _, err = m.s.plan([]statemachine.Event{{SectorSeedReady{seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state) + _, err = m.s.plan([]statemachine.Event{{SectorSeedReady{Seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state) require.Equal(m.t, m.state.State, api.CommitWait) m.planSingle(SectorProving{}) diff --git a/storage/sealing/sealing.go b/storage/sealing/sealing.go index 3c50a1507..bcfdab3b2 100644 --- a/storage/sealing/sealing.go +++ b/storage/sealing/sealing.go @@ -150,9 +150,9 @@ func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r i func (m *Sealing) newSector(sid abi.SectorNumber, rt abi.RegisteredProof, pieces []Piece) error { log.Infof("Start sealing %d", sid) return m.sectors.Send(uint64(sid), SectorStart{ - id: sid, - pieces: pieces, - sectorType: rt, + ID: sid, + Pieces: pieces, + SectorType: rt, }) } diff --git a/storage/sealing/states.go b/storage/sealing/states.go index 24f44fdf0..3e6037b6d 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -46,7 +46,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) } - return ctx.Send(SectorPacked{pieces: pieces}) + return ctx.Send(SectorPacked{Pieces: pieces}) } func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error { @@ -81,9 +81,9 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er } return ctx.Send(SectorSealed{ - commD: cids.Unsealed, - commR: cids.Sealed, - ticket: *ticket, + Unsealed: cids.Unsealed, + Sealed: cids.Sealed, + Ticket: *ticket, }) } @@ -132,7 +132,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) } - return ctx.Send(SectorPreCommitted{message: smsg.Cid()}) + return ctx.Send(SectorPreCommitted{Message: smsg.Cid()}) } func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error { @@ -162,7 +162,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er return err } - ctx.Send(SectorSeedReady{seed: api.SealSeed{ + ctx.Send(SectorSeedReady{Seed: api.SealSeed{ Epoch: randHeight, Value: abi.InteractiveSealRandomness(rand), }}) @@ -229,8 +229,8 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) } return ctx.Send(SectorCommitted{ - proof: proof, - message: smsg.Cid(), + Proof: proof, + Message: smsg.Cid(), }) }