From efaf2c720b279bf9c1b1acda647171cac83eb65d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 20 Jan 2020 23:03:50 +0100 Subject: [PATCH 01/24] sealing: WIP SectorInfo sanity checks --- storage/sealing/checks.go | 73 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 storage/sealing/checks.go diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go new file mode 100644 index 000000000..05647de6c --- /dev/null +++ b/storage/sealing/checks.go @@ -0,0 +1,73 @@ +package sealing + +import ( + "context" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" +) + +func checkPieces(ctx context.Context, si *SectorInfo, api sealingApi) error { + for i, piece := range si.Pieces { + deal, err := api.StateMarketStorageDeal(ctx, piece.DealID, nil) + if err != nil { + return xerrors.Errorf("getting deal %d for piece %d: %w", piece.DealID, i, err) + } + + if string(deal.PieceRef) != string(piece.CommP) { + return xerrors.Errorf("piece %d of sector %d refers deal %d with wrong CommP: %x != %x", i, si.SectorID, piece.DealID, piece.CommP, deal.PieceRef) + } + + if piece.Size != deal.PieceSize { + return xerrors.Errorf("piece %d of sector %d refers deal %d with different size: %d != %d", i, si.SectorID, piece.DealID, piece.Size, deal.PieceSize) + } + } + + return nil +} + +func checkSeal(ctx context.Context, maddr address.Address, si *SectorInfo, api sealingApi) (err error) { + ssize, err := api.StateMinerSectorSize(ctx, maddr, nil) + if err != nil { + return err + } + + ccparams, err := actors.SerializeParams(&actors.ComputeDataCommitmentParams{ + DealIDs: si.deals(), + SectorSize: ssize, + }) + if err != nil { + return xerrors.Errorf("computing params for ComputeDataCommitment: %w", err) + } + + ccmt := &types.Message{ + To: actors.StorageMarketAddress, + From: actors.StorageMarketAddress, + Value: types.NewInt(0), + GasPrice: types.NewInt(0), + GasLimit: types.NewInt(9999999999), + Method: actors.SMAMethods.ComputeDataCommitment, + Params: ccparams, + } + r, err := api.StateCall(ctx, ccmt, nil) + if err != nil { + return xerrors.Errorf("calling ComputeDataCommitment: %w", err) + } + if r.ExitCode != 0 { + return xerrors.Errorf("receipt for ComputeDataCommitment han exit code %d", r.ExitCode) + } + if string(r.Return) != string(si.CommD) { + return xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD) + } + + + // TODO: Validate ticket + // TODO: Verify commp / commr / proof + // TODO: (StateCall PreCommit) + return nil + + +} From e4de5c55acc84a52db0d1b045771687a3dc18f72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 20 Jan 2020 23:04:46 +0100 Subject: [PATCH 02/24] sealing: Some state renaming --- api/api_storage.go | 4 ++-- storage/sealing/fsm.go | 12 ++++++------ storage/sealing/fsm_events.go | 2 +- storage/sealing/fsm_test.go | 4 ++-- storage/sealing/sealing.go | 4 +++- storage/sealing/states.go | 4 ++-- storage/sealing/types.go | 4 ++-- 7 files changed, 18 insertions(+), 16 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index 3fa2ab6d0..e01f859cf 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -19,7 +19,7 @@ const ( Unsealed // sealing / queued PreCommitting // on chain pre-commit - PreCommitted // waiting for seed + WaitSeed // waiting for seed Committing CommitWait // waiting for message to land on chain Proving @@ -61,7 +61,7 @@ var SectorStates = []string{ Packing: "Packing", Unsealed: "Unsealed", PreCommitting: "PreCommitting", - PreCommitted: "PreCommitted", + WaitSeed: "WaitSeed", Committing: "Committing", CommitWait: "CommitWait", Proving: "Proving", diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index 5c37020d2..7459b7487 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -36,10 +36,10 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{ on(SectorSealFailed{}, api.SealFailed), ), api.PreCommitting: planOne( - on(SectorPreCommitted{}, api.PreCommitted), + on(SectorPreCommitted{}, api.WaitSeed), on(SectorPreCommitFailed{}, api.PreCommitFailed), ), - api.PreCommitted: planOne( + api.WaitSeed: planOne( on(SectorSeedReady{}, api.Committing), on(SectorPreCommitFailed{}, api.PreCommitFailed), ), @@ -87,7 +87,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta * PreCommitting <--> PreCommitFailed | | ^ | v | - *<- PreCommitted ------/ + *<- WaitSeed ----------/ | ||| | vvv v--> SealCommitFailed *<- Committing @@ -115,8 +115,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta return m.handleUnsealed, nil case api.PreCommitting: return m.handlePreCommitting, nil - case api.PreCommitted: - return m.handlePreCommitted, nil + case api.WaitSeed: + return m.handleWaitSeed, nil case api.Committing: return m.handleCommitting, nil case api.CommitWait: @@ -172,7 +172,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error { e.apply(state) state.State = api.Committing return nil - case SectorSealCommitFailed: + case SectorComputeProofFailed: state.State = api.SealCommitFailed case SectorSealFailed: state.State = api.CommitFailed diff --git a/storage/sealing/fsm_events.go b/storage/sealing/fsm_events.go index 8392d13e7..662379e9a 100644 --- a/storage/sealing/fsm_events.go +++ b/storage/sealing/fsm_events.go @@ -96,7 +96,7 @@ func (evt SectorSeedReady) apply(state *SectorInfo) { state.Seed = evt.seed } -type SectorSealCommitFailed struct{ error } +type SectorComputeProofFailed struct{ error } type SectorCommitFailed struct{ error } type SectorCommitted struct { message cid.Cid diff --git a/storage/sealing/fsm_test.go b/storage/sealing/fsm_test.go index a76a547dc..2dada5470 100644 --- a/storage/sealing/fsm_test.go +++ b/storage/sealing/fsm_test.go @@ -41,7 +41,7 @@ func TestHappyPath(t *testing.T) { require.Equal(m.t, m.state.State, api.PreCommitting) m.planSingle(SectorPreCommitted{}) - require.Equal(m.t, m.state.State, api.PreCommitted) + require.Equal(m.t, m.state.State, api.WaitSeed) m.planSingle(SectorSeedReady{}) require.Equal(m.t, m.state.State, api.Committing) @@ -67,7 +67,7 @@ func TestSeedRevert(t *testing.T) { require.Equal(m.t, m.state.State, api.PreCommitting) m.planSingle(SectorPreCommitted{}) - require.Equal(m.t, m.state.State, api.PreCommitted) + require.Equal(m.t, m.state.State, api.WaitSeed) m.planSingle(SectorSeedReady{}) require.Equal(m.t, m.state.State, api.Committing) diff --git a/storage/sealing/sealing.go b/storage/sealing/sealing.go index 5b8ce1bb5..5a486f6ef 100644 --- a/storage/sealing/sealing.go +++ b/storage/sealing/sealing.go @@ -6,7 +6,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-sectorbuilder" - "github.com/filecoin-project/lotus/lib/padreader" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" @@ -14,9 +13,11 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/padreader" "github.com/filecoin-project/lotus/lib/statemachine" ) @@ -37,6 +38,7 @@ type sealingApi interface { // TODO: trim down StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme eventually StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) + StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) diff --git a/storage/sealing/states.go b/storage/sealing/states.go index 1bebdb243..ad4a8f542 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -97,7 +97,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorPreCommitted{message: smsg.Cid()}) } -func (m *Sealing) handlePreCommitted(ctx statemachine.Context, sector SectorInfo) error { +func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error { // would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts log.Info("Sector precommitted: ", sector.SectorID) mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage) @@ -147,7 +147,7 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco()) if err != nil { - return ctx.Send(SectorSealCommitFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) + return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) } // TODO: Consider splitting states and persist proof for faster recovery diff --git a/storage/sealing/types.go b/storage/sealing/types.go index 6e248050e..096e83d08 100644 --- a/storage/sealing/types.go +++ b/storage/sealing/types.go @@ -49,7 +49,7 @@ func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) { type SectorInfo struct { State api.SectorState SectorID uint64 - Nonce uint64 + Nonce uint64 // TODO: remove // Packing @@ -63,7 +63,7 @@ type SectorInfo struct { PreCommitMessage *cid.Cid - // PreCommitted + // WaitSeed Seed SealSeed // Committing From 1e877eae86477733232eaf5b25a555a2d37025f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 22 Jan 2020 03:41:39 +0100 Subject: [PATCH 03/24] sealing: Add Log field to SectorInfo --- gen/main.go | 1 + storage/sealing/cbor_gen.go | 273 +++++++++++++++++++++++++++++++++++- storage/sealing/checks.go | 2 - storage/sealing/types.go | 16 ++- 4 files changed, 286 insertions(+), 6 deletions(-) diff --git a/gen/main.go b/gen/main.go index c0698ca47..e258d4ae9 100644 --- a/gen/main.go +++ b/gen/main.go @@ -127,6 +127,7 @@ func main() { sealing.SealSeed{}, sealing.Piece{}, sealing.SectorInfo{}, + sealing.Log{}, ) if err != nil { fmt.Println(err) diff --git a/storage/sealing/cbor_gen.go b/storage/sealing/cbor_gen.go index 1e48bc7d5..e13bcf43a 100644 --- a/storage/sealing/cbor_gen.go +++ b/storage/sealing/cbor_gen.go @@ -395,7 +395,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{173}); err != nil { + if _, err := w.Write([]byte{174}); err != nil { return err } @@ -661,6 +661,31 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { if _, err := w.Write([]byte(t.LastErr)); err != nil { return err } + + // t.Log ([]sealing.Log) (slice) + if len("Log") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Log\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Log")))); err != nil { + return err + } + if _, err := w.Write([]byte("Log")); err != nil { + return err + } + + if len(t.Log) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Log was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Log)))); err != nil { + return err + } + for _, v := range t.Log { + if err := v.MarshalCBOR(w); err != nil { + return err + } + } return nil } @@ -915,6 +940,252 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.LastErr = string(sval) } + // t.Log ([]sealing.Log) (slice) + case "Log": + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.MaxLength { + return fmt.Errorf("t.Log: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + if extra > 0 { + t.Log = make([]Log, extra) + } + for i := 0; i < int(extra); i++ { + + var v Log + if err := v.UnmarshalCBOR(br); err != nil { + return err + } + + t.Log[i] = v + } + + default: + return fmt.Errorf("unknown struct field %d: '%s'", i, name) + } + } + + return nil +} +func (t *Log) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{165}); err != nil { + return err + } + + // t.Timestamp (uint64) (uint64) + if len("Timestamp") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Timestamp\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Timestamp")))); err != nil { + return err + } + if _, err := w.Write([]byte("Timestamp")); err != nil { + return err + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Timestamp))); err != nil { + return err + } + + // t.Trace (string) (string) + if len("Trace") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Trace\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Trace")))); err != nil { + return err + } + if _, err := w.Write([]byte("Trace")); err != nil { + return err + } + + if len(t.Trace) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Trace was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Trace)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Trace)); err != nil { + return err + } + + // t.Message (string) (string) + if len("Message") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Message\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Message")))); err != nil { + return err + } + if _, err := w.Write([]byte("Message")); err != nil { + return err + } + + if len(t.Message) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Message was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Message)); err != nil { + return err + } + + // t.Kind (string) (string) + if len("Kind") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Kind\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Kind")))); err != nil { + return err + } + if _, err := w.Write([]byte("Kind")); err != nil { + return err + } + + if len(t.Kind) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Kind was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Kind)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Kind)); err != nil { + return err + } + + // t.Params ([]uint8) (slice) + if len("Params") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Params\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Params")))); err != nil { + return err + } + if _, err := w.Write([]byte("Params")); err != nil { + return err + } + + if len(t.Params) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.Params was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Params)))); err != nil { + return err + } + if _, err := w.Write(t.Params); err != nil { + return err + } + return nil +} + +func (t *Log) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra > cbg.MaxLength { + return fmt.Errorf("Log: map struct too large (%d)", extra) + } + + var name string + n := extra + + for i := uint64(0); i < n; i++ { + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + switch name { + // t.Timestamp (uint64) (uint64) + case "Timestamp": + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Timestamp = uint64(extra) + // t.Trace (string) (string) + case "Trace": + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Trace = string(sval) + } + // t.Message (string) (string) + case "Message": + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Message = string(sval) + } + // t.Kind (string) (string) + case "Kind": + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Kind = string(sval) + } + // t.Params ([]uint8) (slice) + case "Params": + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.Params: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.Params = make([]byte, extra) + if _, err := io.ReadFull(br, t.Params); err != nil { + return err + } default: return fmt.Errorf("unknown struct field %d: '%s'", i, name) diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index 05647de6c..9e12bf943 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -63,11 +63,9 @@ func checkSeal(ctx context.Context, maddr address.Address, si *SectorInfo, api s return xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD) } - // TODO: Validate ticket // TODO: Verify commp / commr / proof // TODO: (StateCall PreCommit) return nil - } diff --git a/storage/sealing/types.go b/storage/sealing/types.go index 096e83d08..ae8d7ac23 100644 --- a/storage/sealing/types.go +++ b/storage/sealing/types.go @@ -2,9 +2,8 @@ package sealing import ( sectorbuilder "github.com/filecoin-project/go-sectorbuilder" - "github.com/ipfs/go-cid" - "github.com/filecoin-project/lotus/api" + "github.com/ipfs/go-cid" ) type SealTicket struct { @@ -46,6 +45,17 @@ func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) { return out } +type Log struct { + Timestamp uint64 + Trace string // for errors + + Message string + + // additional data (Event info) + Kind string + Params []byte +} + type SectorInfo struct { State api.SectorState SectorID uint64 @@ -75,7 +85,7 @@ type SectorInfo struct { // Debug LastErr string - // TODO: Log []struct{ts, msg, trace string} + Log []Log } func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { From 052b090bbfe299a0d96c200d9e4c4da585d9fd5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 22 Jan 2020 19:30:56 +0100 Subject: [PATCH 04/24] sealing: Fix planOne for global events --- storage/sealing/checks.go | 3 --- storage/sealing/fsm.go | 7 ++++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index 9e12bf943..e6ee9f7e8 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -63,9 +63,6 @@ func checkSeal(ctx context.Context, maddr address.Address, si *SectorInfo, api s return xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD) } - // TODO: Validate ticket - // TODO: Verify commp / commr / proof - // TODO: (StateCall PreCommit) return nil } diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index 7e46b6eb5..ce4e74408 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -229,6 +229,11 @@ func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []sta return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", api.SectorStates[state.State], events) } + if gm, ok := events[0].User.(globalMutator); !ok { + gm.applyGlobal(state) + return nil + } + for _, t := range ts { mut, next := t() @@ -245,6 +250,6 @@ func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []sta return nil } - return xerrors.Errorf("planner for state %s received unexpected event %+v", api.SectorStates[state.State], events[0]) + return xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", api.SectorStates[state.State], events[0].User, events[0]) } } From ba5a5d1248ff79629819119ec24eceaa1fcde643 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 22 Jan 2020 20:47:29 +0100 Subject: [PATCH 05/24] sealing: wire up checkPieces and checkSeal --- api/api_storage.go | 2 +- storage/sealing/checks.go | 8 ++++---- storage/sealing/fsm.go | 6 ++++-- storage/sealing/fsm_events.go | 4 ++++ storage/sealing/states.go | 8 ++++++++ 5 files changed, 21 insertions(+), 7 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index e01f859cf..9519e5673 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -45,7 +45,7 @@ const ( PreCommitFailed SealCommitFailed CommitFailed - _ + PackingFailed _ _ _ diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index e6ee9f7e8..a14099c12 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -10,7 +10,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -func checkPieces(ctx context.Context, si *SectorInfo, api sealingApi) error { +func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { for i, piece := range si.Pieces { deal, err := api.StateMarketStorageDeal(ctx, piece.DealID, nil) if err != nil { @@ -29,7 +29,7 @@ func checkPieces(ctx context.Context, si *SectorInfo, api sealingApi) error { return nil } -func checkSeal(ctx context.Context, maddr address.Address, si *SectorInfo, api sealingApi) (err error) { +func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api sealingApi) (err error) { ssize, err := api.StateMinerSectorSize(ctx, maddr, nil) if err != nil { return err @@ -45,7 +45,7 @@ func checkSeal(ctx context.Context, maddr address.Address, si *SectorInfo, api s ccmt := &types.Message{ To: actors.StorageMarketAddress, - From: actors.StorageMarketAddress, + From: maddr, Value: types.NewInt(0), GasPrice: types.NewInt(0), GasLimit: types.NewInt(9999999999), @@ -57,7 +57,7 @@ func checkSeal(ctx context.Context, maddr address.Address, si *SectorInfo, api s return xerrors.Errorf("calling ComputeDataCommitment: %w", err) } if r.ExitCode != 0 { - return xerrors.Errorf("receipt for ComputeDataCommitment han exit code %d", r.ExitCode) + return xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode) } if string(r.Return) != string(si.CommD) { return xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD) diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index ce4e74408..ba2702696 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -34,8 +34,10 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{ api.Unsealed: planOne( on(SectorSealed{}, api.PreCommitting), on(SectorSealFailed{}, api.SealFailed), + on(SectorPackingFailed{}, api.PackingFailed), ), api.PreCommitting: planOne( + on(SectorSealFailed{}, api.SealFailed), on(SectorPreCommitted{}, api.WaitSeed), on(SectorPreCommitFailed{}, api.PreCommitFailed), ), @@ -221,7 +223,7 @@ func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []sta return func(events []statemachine.Event, state *SectorInfo) error { if len(events) != 1 { for _, event := range events { - if gm, ok := event.User.(globalMutator); !ok { + if gm, ok := event.User.(globalMutator); ok { gm.applyGlobal(state) return nil } @@ -229,7 +231,7 @@ func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []sta return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", api.SectorStates[state.State], events) } - if gm, ok := events[0].User.(globalMutator); !ok { + if gm, ok := events[0].User.(globalMutator); ok { gm.applyGlobal(state) return nil } diff --git a/storage/sealing/fsm_events.go b/storage/sealing/fsm_events.go index 12c47f046..d25668da2 100644 --- a/storage/sealing/fsm_events.go +++ b/storage/sealing/fsm_events.go @@ -60,6 +60,10 @@ func (evt SectorPacked) apply(state *SectorInfo) { state.Pieces = append(state.Pieces, evt.pieces...) } +type SectorPackingFailed struct{ error } + +func (evt SectorPackingFailed) apply(*SectorInfo) {} + type SectorSealed struct { commR []byte commD []byte diff --git a/storage/sealing/states.go b/storage/sealing/states.go index ad4a8f542..219e26475 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -44,6 +44,10 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err } func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error { + if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state + return ctx.Send(SectorPackingFailed{xerrors.Errorf("checkPieces error: %w", err)}) + } + log.Infow("performing sector replication...", "sector", sector.SectorID) ticket, err := m.tktFn(ctx.Context()) if err != nil { @@ -66,6 +70,10 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er } func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { + if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil { + return ctx.Send(SectorSealFailed{xerrors.Errorf("checkPieces error: %w", err)}) + } + params := &actors.SectorPreCommitInfo{ SectorNumber: sector.SectorID, From d3a90062728eb61bcd1957891f126f914dd2ccb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 22 Jan 2020 21:29:19 +0100 Subject: [PATCH 06/24] sealing: Wire up sector event log --- storage/sealing/cbor_gen.go | 43 +------------------------------------ storage/sealing/fsm.go | 16 ++++++++++++++ storage/sealing/types.go | 1 - 3 files changed, 17 insertions(+), 43 deletions(-) diff --git a/storage/sealing/cbor_gen.go b/storage/sealing/cbor_gen.go index e13bcf43a..f83d35990 100644 --- a/storage/sealing/cbor_gen.go +++ b/storage/sealing/cbor_gen.go @@ -980,7 +980,7 @@ func (t *Log) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{165}); err != nil { + if _, err := w.Write([]byte{164}); err != nil { return err } @@ -1068,29 +1068,6 @@ func (t *Log) MarshalCBOR(w io.Writer) error { if _, err := w.Write([]byte(t.Kind)); err != nil { return err } - - // t.Params ([]uint8) (slice) - if len("Params") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"Params\" was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Params")))); err != nil { - return err - } - if _, err := w.Write([]byte("Params")); err != nil { - return err - } - - if len(t.Params) > cbg.ByteArrayMaxLen { - return xerrors.Errorf("Byte array in field t.Params was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Params)))); err != nil { - return err - } - if _, err := w.Write(t.Params); err != nil { - return err - } return nil } @@ -1168,24 +1145,6 @@ func (t *Log) UnmarshalCBOR(r io.Reader) error { t.Kind = string(sval) } - // t.Params ([]uint8) (slice) - case "Params": - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - - if extra > cbg.ByteArrayMaxLen { - return fmt.Errorf("t.Params: byte array too large (%d)", extra) - } - if maj != cbg.MajByteString { - return fmt.Errorf("expected byte array") - } - t.Params = make([]byte, extra) - if _, err := io.ReadFull(br, t.Params); err != nil { - return err - } default: return fmt.Errorf("unknown struct field %d: '%s'", i, name) diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index ba2702696..fced052aa 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -2,7 +2,9 @@ package sealing import ( "context" + "fmt" "reflect" + "time" "golang.org/x/xerrors" @@ -66,6 +68,20 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta ///// // First process all events + for _, event := range events { + l := Log{ + Timestamp: uint64(time.Now().Unix()), + Message: fmt.Sprintf("%+v", event), + Kind: fmt.Sprintf("event;%T", event.User), + } + + if err, iserr := event.User.(xerrors.Formatter); iserr { + l.Trace = fmt.Sprintf("%+v", err) + } + + state.Log = append(state.Log, l) + } + p := fsmPlanners[state.State] if p == nil { return nil, xerrors.Errorf("planner for state %d not found", state.State) diff --git a/storage/sealing/types.go b/storage/sealing/types.go index ae8d7ac23..952c5ec22 100644 --- a/storage/sealing/types.go +++ b/storage/sealing/types.go @@ -53,7 +53,6 @@ type Log struct { // additional data (Event info) Kind string - Params []byte } type SectorInfo struct { From e43174f2af733a45411ebe58bb1a44904d1d59e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 22 Jan 2020 22:16:45 +0100 Subject: [PATCH 07/24] sealing: Not getting seal ticket isn't fatal --- storage/sealing/states.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/sealing/states.go b/storage/sealing/states.go index 219e26475..cb6c03811 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -51,7 +51,7 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er log.Infow("performing sector replication...", "sector", sector.SectorID) ticket, err := m.tktFn(ctx.Context()) if err != nil { - return err + return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)}) } rspco, err := m.sb.SealPreCommit(ctx.Context(), sector.SectorID, *ticket, sector.pieceInfos()) From 7fb3360d6f6992196bb6f91889a1e2583f1f9dbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 10:46:46 +0100 Subject: [PATCH 08/24] storage: Fix handing stop call --- storage/miner.go | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/storage/miner.go b/storage/miner.go index 4cd2c675a..ce4f6a9e4 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -36,9 +36,6 @@ type Miner struct { worker address.Address sealing *sealing.Sealing - - stop chan struct{} - stopped chan struct{} } type storageMinerApi interface { @@ -76,9 +73,6 @@ func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datasto tktFn: tktFn, maddr: addr, - - stop: make(chan struct{}), - stopped: make(chan struct{}), } return m, nil @@ -109,14 +103,7 @@ func (m *Miner) Run(ctx context.Context) error { func (m *Miner) Stop(ctx context.Context) error { defer m.sealing.Stop(ctx) - - close(m.stop) - select { - case <-m.stopped: - return nil - case <-ctx.Done(): - return ctx.Err() - } + return nil } func (m *Miner) runPreflightChecks(ctx context.Context) error { From e67ee227a3b4a69e294bce577bece3b741c37033 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 11:02:20 +0100 Subject: [PATCH 09/24] sealing: Nonzero exit on commit isn't fatal --- storage/sealing/states.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/storage/sealing/states.go b/storage/sealing/states.go index cb6c03811..754e88d04 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -206,8 +206,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) } if mw.Receipt.ExitCode != 0 { - log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof) - return xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode) + return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)}) } return ctx.Send(SectorProving{}) From 11a6bff41655cdd27ee33ec3caf267cd35b1ee04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 11:10:42 +0100 Subject: [PATCH 10/24] miner: Handle stop signal when sleeping --- miner/miner.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/miner/miner.go b/miner/miner.go index 771662750..0395165fa 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -227,7 +227,16 @@ eventLoop: } } else { nextRound := time.Unix(int64(base.ts.MinTimestamp()+uint64(build.BlockDelay*base.nullRounds)), 0) - time.Sleep(time.Until(nextRound)) + + select { + case <-time.After(time.Until(nextRound)): + case <-m.stop: + stopping := m.stopping + m.stop = nil + m.stopping = nil + close(stopping) + return + } } } } From 26e6bc32eeed4af4056abe4ef4c626bc73bf8540 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 13:17:45 +0100 Subject: [PATCH 11/24] sealing: check deal expiration --- storage/sealing/checks.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index a14099c12..01960a09f 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -10,19 +10,33 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +type ErrApi error + +type ErrInvalidDeals error +type ErrExpiredDeals error + func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { + head, err := api.ChainHead(ctx) + if err != nil { + return ErrApi(xerrors.Errorf("getting chain head: %w", err)) + } + for i, piece := range si.Pieces { deal, err := api.StateMarketStorageDeal(ctx, piece.DealID, nil) if err != nil { - return xerrors.Errorf("getting deal %d for piece %d: %w", piece.DealID, i, err) + return ErrApi(xerrors.Errorf("getting deal %d for piece %d: %w", piece.DealID, i, err)) } if string(deal.PieceRef) != string(piece.CommP) { - return xerrors.Errorf("piece %d of sector %d refers deal %d with wrong CommP: %x != %x", i, si.SectorID, piece.DealID, piece.CommP, deal.PieceRef) + return ErrInvalidDeals(xerrors.Errorf("piece %d of sector %d refers deal %d with wrong CommP: %x != %x", i, si.SectorID, piece.DealID, piece.CommP, deal.PieceRef)) } if piece.Size != deal.PieceSize { - return xerrors.Errorf("piece %d of sector %d refers deal %d with different size: %d != %d", i, si.SectorID, piece.DealID, piece.Size, deal.PieceSize) + return ErrInvalidDeals(xerrors.Errorf("piece %d of sector %d refers deal %d with different size: %d != %d", i, si.SectorID, piece.DealID, piece.Size, deal.PieceSize)) + } + + if head.Height() >= deal.ProposalExpiration { + return ErrExpiredDeals(xerrors.Errorf("piece %d of sector %d refers expired deal %d - expires %d, head %d", i, si.SectorID, piece.DealID, deal.ProposalExpiration, head.Height())) } } From 02c8ab783962d5480baceb7224576e036473004c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 15:05:44 +0100 Subject: [PATCH 12/24] sealing: Check ticket expiration --- storage/sealing/checks.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index 01960a09f..bc95ba0fd 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -6,6 +6,8 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" ) @@ -15,6 +17,9 @@ type ErrApi error type ErrInvalidDeals error type ErrExpiredDeals error +type ErrBadCommD error +type ErrExpiredTicket error + func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { head, err := api.ChainHead(ctx) if err != nil { @@ -44,9 +49,14 @@ func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { } func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api sealingApi) (err error) { - ssize, err := api.StateMinerSectorSize(ctx, maddr, nil) + head, err := api.ChainHead(ctx) if err != nil { - return err + return ErrApi(xerrors.Errorf("getting chain head: %w", err)) + } + + ssize, err := api.StateMinerSectorSize(ctx, maddr, head) + if err != nil { + return ErrApi(err) } ccparams, err := actors.SerializeParams(&actors.ComputeDataCommitmentParams{ @@ -71,10 +81,14 @@ func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api se return xerrors.Errorf("calling ComputeDataCommitment: %w", err) } if r.ExitCode != 0 { - return xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode) + return ErrBadCommD(xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode)) } if string(r.Return) != string(si.CommD) { - return xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD) + return ErrBadCommD(xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD)) + } + + if int64(head.Height()) - int64(si.Ticket.BlockHeight + build.SealRandomnessLookback) > build.SealRandomnessLookbackLimit { + return ErrExpiredTicket(xerrors.Errorf("ticket expired: seal height: %d, head: %d", si.Ticket.BlockHeight + build.SealRandomnessLookback, head.Height())) } return nil From c3a5da85869241d39a39a11a995b52600fcceca4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 15:18:05 +0100 Subject: [PATCH 13/24] storageminer: log flag for sector status --- api/api_storage.go | 11 +++++++++++ cmd/lotus-storage-miner/sectors.go | 18 ++++++++++++++++++ go.mod | 2 +- go.sum | 4 ++-- node/impl/storminer.go | 11 +++++++++++ storage/sealing/checks.go | 4 ++-- storage/sealing/types.go | 2 +- 7 files changed, 46 insertions(+), 6 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index 9519e5673..f52f9f3a4 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -107,6 +107,15 @@ type StorageMiner interface { WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error } +type SectorLog struct { + Kind string + Timestamp uint64 + + Trace string + + Message string +} + type SectorInfo struct { SectorID uint64 State SectorState @@ -119,6 +128,8 @@ type SectorInfo struct { Retries uint64 LastErr string + + Log []SectorLog } type SealedRef struct { diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index bdc73f236..9020c9316 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -4,6 +4,7 @@ import ( "fmt" "sort" "strconv" + "time" "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" @@ -41,6 +42,12 @@ var sectorsCmd = &cli.Command{ var sectorsStatusCmd = &cli.Command{ Name: "status", Usage: "Get the seal status of a sector by its ID", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "log", + Usage: "display event log", + }, + }, Action: func(cctx *cli.Context) error { nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { @@ -77,6 +84,17 @@ var sectorsStatusCmd = &cli.Command{ if status.LastErr != "" { fmt.Printf("Last Error:\t\t%s\n", status.LastErr) } + + if cctx.Bool("log") { + fmt.Printf("--------\nEvent Log:\n") + + for i, l := range status.Log { + fmt.Printf("%d.\t%s:\t[%s]\t%s\n", i, time.Unix(int64(l.Timestamp), 0), l.Kind, l.Message) + if l.Trace != "" { + fmt.Printf("\t%s\n", l.Trace) + } + } + } return nil }, } diff --git a/go.mod b/go.mod index 81e54f3d5..e8570aace 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 github.com/filecoin-project/go-paramfetch v0.0.1 - github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200122195713-697609991669 + github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123134702-99304d8411ed github.com/filecoin-project/go-statestore v0.1.0 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/go-ole/go-ole v1.2.4 // indirect diff --git a/go.sum b/go.sum index 9f88c7178..1dc1f5668 100644 --- a/go.sum +++ b/go.sum @@ -117,8 +117,8 @@ github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go. github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE= github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= github.com/filecoin-project/go-sectorbuilder v0.0.1/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= -github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200122195713-697609991669 h1:SpwORqUXMVB2Ejr8c4zIGiihxGM5Tu15skOWa5pvRr8= -github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200122195713-697609991669/go.mod h1:ahsryULdwYoZ94K09HcfqX3QBwevWVldENSV/EdCbNg= +github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123134702-99304d8411ed h1:XBuYbMEzBePbN8ks0P8BDOOo0KJxcseFP8ggDjsb+sk= +github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123134702-99304d8411ed/go.mod h1:ahsryULdwYoZ94K09HcfqX3QBwevWVldENSV/EdCbNg= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 0e2093efe..b600dfb7f 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -160,6 +160,16 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid uint64) (api.S deals[i] = piece.DealID } + log := make([]api.SectorLog, len(info.Log)) + for i, l := range info.Log { + log[i] = api.SectorLog{ + Kind: l.Kind, + Timestamp: l.Timestamp, + Trace: l.Trace, + Message: l.Message, + } + } + return api.SectorInfo{ SectorID: sid, State: info.State, @@ -172,6 +182,7 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid uint64) (api.S Retries: info.Nonce, LastErr: info.LastErr, + Log: log, }, nil } diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index bc95ba0fd..3ec86b9f3 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -87,8 +87,8 @@ func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api se return ErrBadCommD(xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD)) } - if int64(head.Height()) - int64(si.Ticket.BlockHeight + build.SealRandomnessLookback) > build.SealRandomnessLookbackLimit { - return ErrExpiredTicket(xerrors.Errorf("ticket expired: seal height: %d, head: %d", si.Ticket.BlockHeight + build.SealRandomnessLookback, head.Height())) + if int64(head.Height())-int64(si.Ticket.BlockHeight+build.SealRandomnessLookback) > build.SealRandomnessLookbackLimit { + return ErrExpiredTicket(xerrors.Errorf("ticket expired: seal height: %d, head: %d", si.Ticket.BlockHeight+build.SealRandomnessLookback, head.Height())) } return nil diff --git a/storage/sealing/types.go b/storage/sealing/types.go index 952c5ec22..f0fbe09a4 100644 --- a/storage/sealing/types.go +++ b/storage/sealing/types.go @@ -52,7 +52,7 @@ type Log struct { Message string // additional data (Event info) - Kind string + Kind string } type SectorInfo struct { From 558e4c7669292fd58bdd75d9684df49da2723810 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 15:33:28 +0100 Subject: [PATCH 14/24] Fix tests after sectorbuilder update --- chain/sync_test.go | 2 +- go.mod | 2 +- go.sum | 4 ++-- storage/sbmock/sbmock.go | 4 ++++ 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/chain/sync_test.go b/chain/sync_test.go index 9240dea21..5964d18c8 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -73,7 +73,7 @@ func prepSyncTest(t testing.TB, h int) *syncTestUtil { g, err := gen.NewGenerator() if err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) } ctx, cancel := context.WithCancel(context.Background()) diff --git a/go.mod b/go.mod index e8570aace..c0a670362 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 github.com/filecoin-project/go-paramfetch v0.0.1 - github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123134702-99304d8411ed + github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55 github.com/filecoin-project/go-statestore v0.1.0 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/go-ole/go-ole v1.2.4 // indirect diff --git a/go.sum b/go.sum index 1dc1f5668..dc41267e3 100644 --- a/go.sum +++ b/go.sum @@ -117,8 +117,8 @@ github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go. github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE= github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= github.com/filecoin-project/go-sectorbuilder v0.0.1/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= -github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123134702-99304d8411ed h1:XBuYbMEzBePbN8ks0P8BDOOo0KJxcseFP8ggDjsb+sk= -github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123134702-99304d8411ed/go.mod h1:ahsryULdwYoZ94K09HcfqX3QBwevWVldENSV/EdCbNg= +github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55 h1:XChPRKPZL+/N6a3ccLmjCJ7JrR+SFLFJDllv0BkxW4I= +github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55/go.mod h1:ahsryULdwYoZ94K09HcfqX3QBwevWVldENSV/EdCbNg= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= diff --git a/storage/sbmock/sbmock.go b/storage/sbmock/sbmock.go index b8ca9b69e..f28df2395 100644 --- a/storage/sbmock/sbmock.go +++ b/storage/sbmock/sbmock.go @@ -207,6 +207,10 @@ func (sb *SBMock) GetPath(string, string) (string, error) { panic("nyi") } +func (sb *SBMock) CanCommit(sectorID uint64) (bool, error) { + return true, nil +} + func (sb *SBMock) WorkerStats() sectorbuilder.WorkerStats { panic("nyi") } From f5540195de5519c28c835dcc5b36edf70a70cc27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 15:38:36 +0100 Subject: [PATCH 15/24] worker: Fix progress bars --- cmd/lotus-seal-worker/transfer.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cmd/lotus-seal-worker/transfer.go b/cmd/lotus-seal-worker/transfer.go index ea1bec786..54ce552fe 100644 --- a/cmd/lotus-seal-worker/transfer.go +++ b/cmd/lotus-seal-worker/transfer.go @@ -14,6 +14,13 @@ import ( "github.com/filecoin-project/lotus/lib/tarutil" ) +func (w *worker) sizeForType(typ string) int64 { + size := int64(w.sb.SectorSize()) + if typ == "cache" { + size *= 10 + } + return size +} func (w *worker) fetch(typ string, sectorID uint64) error { outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) @@ -37,7 +44,7 @@ func (w *worker) fetch(typ string, sectorID uint64) error { return xerrors.Errorf("non-200 code: %d", resp.StatusCode) } - bar := pb.New64(resp.ContentLength) + bar := pb.New64(w.sizeForType(typ)) bar.ShowPercent = true bar.ShowSpeed = true bar.Units = pb.U_BYTES @@ -88,7 +95,7 @@ func (w *worker) push(typ string, sectorID uint64) error { return xerrors.Errorf("opening push reader: %w", err) } - bar := pb.New64(0) + bar := pb.New64(w.sizeForType(typ)) bar.ShowPercent = true bar.ShowSpeed = true bar.ShowCounters = true From 94f2948020372e4004c1c48c8c5837f2b3900030 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 16:38:01 +0100 Subject: [PATCH 16/24] sealing: implement handler for sealFailed --- api/api_storage.go | 1 + cmd/lotus-seal-worker/transfer.go | 1 + storage/miner.go | 1 + storage/sealing/checks.go | 4 ++- storage/sealing/fsm.go | 6 +++- storage/sealing/fsm_events.go | 8 +++++ storage/sealing/sealing.go | 1 + storage/sealing/states.go | 24 ++++++++++++-- storage/sealing/states_failed.go | 55 +++++++++++++++++++++++++++++++ 9 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 storage/sealing/states_failed.go diff --git a/api/api_storage.go b/api/api_storage.go index f52f9f3a4..77cf32a84 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -70,6 +70,7 @@ var SectorStates = []string{ PreCommitFailed: "PreCommitFailed", SealCommitFailed: "SealCommitFailed", CommitFailed: "CommitFailed", + PackingFailed: "PackingFailed", FailedUnrecoverable: "FailedUnrecoverable", diff --git a/cmd/lotus-seal-worker/transfer.go b/cmd/lotus-seal-worker/transfer.go index 54ce552fe..fcd473392 100644 --- a/cmd/lotus-seal-worker/transfer.go +++ b/cmd/lotus-seal-worker/transfer.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/lotus/lib/tarutil" ) + func (w *worker) sizeForType(typ string) int64 { size := int64(w.sb.SectorSize()) if typ == "cache" { diff --git a/storage/miner.go b/storage/miner.go index ce4f6a9e4..3355107da 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -50,6 +50,7 @@ type storageMinerApi interface { StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) + StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index 3ec86b9f3..491e330b0 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -12,6 +12,8 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +// TODO: For now we handle this by halting state execution, when we get jsonrpc reconnecting +// We should implement some wait-for-api logic type ErrApi error type ErrInvalidDeals error @@ -78,7 +80,7 @@ func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api se } r, err := api.StateCall(ctx, ccmt, nil) if err != nil { - return xerrors.Errorf("calling ComputeDataCommitment: %w", err) + return ErrApi(xerrors.Errorf("calling ComputeDataCommitment: %w", err)) } if r.ExitCode != 0 { return ErrBadCommD(xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode)) diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index fced052aa..d6bd4160e 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -58,6 +58,10 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{ on(SectorFaulty{}, api.Faulty), ), + api.SealFailed: planOne( + on(SectorRetrySeal{}, api.Unsealed), + ), + api.Faulty: planOne( on(SectorFaultReported{}, api.FaultReported), ), @@ -84,7 +88,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta p := fsmPlanners[state.State] if p == nil { - return nil, xerrors.Errorf("planner for state %d not found", state.State) + return nil, xerrors.Errorf("planner for state %s not found", state.State, api.SectorStates[state.State]) } if err := p(events, state); err != nil { diff --git a/storage/sealing/fsm_events.go b/storage/sealing/fsm_events.go index d25668da2..948a1653b 100644 --- a/storage/sealing/fsm_events.go +++ b/storage/sealing/fsm_events.go @@ -120,6 +120,14 @@ type SectorProving struct{} func (evt SectorProving) apply(*SectorInfo) {} +// Failed state recovery + +type SectorRetrySeal struct{} + +func (evt SectorRetrySeal) apply(state *SectorInfo) {} + +// Faults + type SectorFaulty struct{} func (evt SectorFaulty) apply(state *SectorInfo) {} diff --git a/storage/sealing/sealing.go b/storage/sealing/sealing.go index 6d235488e..45034047e 100644 --- a/storage/sealing/sealing.go +++ b/storage/sealing/sealing.go @@ -39,6 +39,7 @@ type sealingApi interface { // TODO: trim down StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) + StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) diff --git a/storage/sealing/states.go b/storage/sealing/states.go index 754e88d04..81c3549df 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -45,7 +45,17 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error { if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state - return ctx.Send(SectorPackingFailed{xerrors.Errorf("checkPieces error: %w", err)}) + switch err.(type) { + case ErrApi: + log.Errorf("handleUnsealed: api error, not proceeding: %+v", err) + return nil + case ErrInvalidDeals: + return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)}) + case ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector? + return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired deals in sector: %w", err)}) + default: + return xerrors.Errorf("checkPieces sanity check error: %w", err) + } } log.Infow("performing sector replication...", "sector", sector.SectorID) @@ -71,7 +81,17 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil { - return ctx.Send(SectorSealFailed{xerrors.Errorf("checkPieces error: %w", err)}) + switch err.(type) { + case ErrApi: + log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) + return nil + case ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too) + return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) + case ErrExpiredTicket: + return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) + default: + return xerrors.Errorf("checkSeal sanity check error: %w", err) + } } params := &actors.SectorPreCommitInfo{ diff --git a/storage/sealing/states_failed.go b/storage/sealing/states_failed.go new file mode 100644 index 000000000..5d125c2be --- /dev/null +++ b/storage/sealing/states_failed.go @@ -0,0 +1,55 @@ +package sealing + +import ( + "fmt" + "time" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/lib/statemachine" +) + +const minRetryTime = 1 * time.Minute + +func failedCooldown(ctx statemachine.Context, sector SectorInfo) error { + retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(minRetryTime) + if len(sector.Log) > 0 && !time.Now().After(retryStart) { + log.Infof("%s(%d), waiting %s before retrying", api.SectorStates[sector.State], time.Until(retryStart)) + select { + case <-time.After(time.Until(retryStart)): + case <-ctx.Context().Done(): + return ctx.Context().Err() + } + } + + return nil +} + +func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo) error { + // TODO: + + act, err := m.api.StateGetActor(ctx.Context(), m.maddr, nil) + if err != nil { + log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err) + return nil + } + + st, err := m.api.StateReadState(ctx.Context(), act, nil) + if err != nil { + log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err) + return nil + } + + _, found := st.State.(map[string]interface{})["PreCommittedSectors"].(map[string]interface{})[fmt.Sprint(sector.SectorID)] + if found { + // TODO: If not expired yet, we can just try reusing sealticket + log.Errorf("sector found in miner preseal array: %+v", sector.SectorID, err) + return nil + } + // + + if err := failedCooldown(ctx, sector); err != nil { + return err + } + + return ctx.Send(SectorRetrySeal{}) +} From 67b441838bd5caf78a37860f50901a2c8c2067c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 16:47:33 +0100 Subject: [PATCH 17/24] sealing: Actually call handleSealFailed --- storage/sealing/fsm.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index d6bd4160e..e105a328e 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -88,7 +88,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta p := fsmPlanners[state.State] if p == nil { - return nil, xerrors.Errorf("planner for state %s not found", state.State, api.SectorStates[state.State]) + return nil, xerrors.Errorf("planner for state %s not found", api.SectorStates[state.State]) } if err := p(events, state); err != nil { @@ -152,7 +152,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta // Handled failure modes case api.SealFailed: - log.Warnf("sector %d entered unimplemented state 'SealFailed'", state.SectorID) + return m.handleSealFailed, nil case api.PreCommitFailed: log.Warnf("sector %d entered unimplemented state 'PreCommitFailed'", state.SectorID) case api.SealCommitFailed: From 01dc9c767e9f5264286a1e7acca08193bb8f8c47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 16:57:14 +0100 Subject: [PATCH 18/24] sealing: Error types that can actually be checked --- storage/sealing/checks.go | 32 ++++++++++++++++---------------- storage/sealing/states_failed.go | 2 +- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index 491e330b0..18e48faed 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -14,36 +14,36 @@ import ( // TODO: For now we handle this by halting state execution, when we get jsonrpc reconnecting // We should implement some wait-for-api logic -type ErrApi error +type ErrApi struct{error} -type ErrInvalidDeals error -type ErrExpiredDeals error +type ErrInvalidDeals struct{error} +type ErrExpiredDeals struct{error} -type ErrBadCommD error -type ErrExpiredTicket error +type ErrBadCommD struct{error} +type ErrExpiredTicket struct{error} func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { head, err := api.ChainHead(ctx) if err != nil { - return ErrApi(xerrors.Errorf("getting chain head: %w", err)) + return &ErrApi{xerrors.Errorf("getting chain head: %w", err)} } for i, piece := range si.Pieces { deal, err := api.StateMarketStorageDeal(ctx, piece.DealID, nil) if err != nil { - return ErrApi(xerrors.Errorf("getting deal %d for piece %d: %w", piece.DealID, i, err)) + return &ErrApi{xerrors.Errorf("getting deal %d for piece %d: %w", piece.DealID, i, err)} } if string(deal.PieceRef) != string(piece.CommP) { - return ErrInvalidDeals(xerrors.Errorf("piece %d of sector %d refers deal %d with wrong CommP: %x != %x", i, si.SectorID, piece.DealID, piece.CommP, deal.PieceRef)) + return &ErrInvalidDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers deal %d with wrong CommP: %x != %x", i, len(si.Pieces), si.SectorID, piece.DealID, piece.CommP, deal.PieceRef)} } if piece.Size != deal.PieceSize { - return ErrInvalidDeals(xerrors.Errorf("piece %d of sector %d refers deal %d with different size: %d != %d", i, si.SectorID, piece.DealID, piece.Size, deal.PieceSize)) + return &ErrInvalidDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers deal %d with different size: %d != %d", i, len(si.Pieces), si.SectorID, piece.DealID, piece.Size, deal.PieceSize)} } if head.Height() >= deal.ProposalExpiration { - return ErrExpiredDeals(xerrors.Errorf("piece %d of sector %d refers expired deal %d - expires %d, head %d", i, si.SectorID, piece.DealID, deal.ProposalExpiration, head.Height())) + return &ErrExpiredDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers expired deal %d - expires %d, head %d", i, len(si.Pieces), si.SectorID, piece.DealID, deal.ProposalExpiration, head.Height())} } } @@ -53,12 +53,12 @@ func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api sealingApi) (err error) { head, err := api.ChainHead(ctx) if err != nil { - return ErrApi(xerrors.Errorf("getting chain head: %w", err)) + return &ErrApi{xerrors.Errorf("getting chain head: %w", err)} } ssize, err := api.StateMinerSectorSize(ctx, maddr, head) if err != nil { - return ErrApi(err) + return &ErrApi{err} } ccparams, err := actors.SerializeParams(&actors.ComputeDataCommitmentParams{ @@ -80,17 +80,17 @@ func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api se } r, err := api.StateCall(ctx, ccmt, nil) if err != nil { - return ErrApi(xerrors.Errorf("calling ComputeDataCommitment: %w", err)) + return &ErrApi{xerrors.Errorf("calling ComputeDataCommitment: %w", err)} } if r.ExitCode != 0 { - return ErrBadCommD(xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode)) + return &ErrBadCommD{xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode)} } if string(r.Return) != string(si.CommD) { - return ErrBadCommD(xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD)) + return &ErrBadCommD{xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD)} } if int64(head.Height())-int64(si.Ticket.BlockHeight+build.SealRandomnessLookback) > build.SealRandomnessLookbackLimit { - return ErrExpiredTicket(xerrors.Errorf("ticket expired: seal height: %d, head: %d", si.Ticket.BlockHeight+build.SealRandomnessLookback, head.Height())) + return &ErrExpiredTicket{xerrors.Errorf("ticket expired: seal height: %d, head: %d", si.Ticket.BlockHeight+build.SealRandomnessLookback, head.Height())} } return nil diff --git a/storage/sealing/states_failed.go b/storage/sealing/states_failed.go index 5d125c2be..8582471e7 100644 --- a/storage/sealing/states_failed.go +++ b/storage/sealing/states_failed.go @@ -13,7 +13,7 @@ const minRetryTime = 1 * time.Minute func failedCooldown(ctx statemachine.Context, sector SectorInfo) error { retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(minRetryTime) if len(sector.Log) > 0 && !time.Now().After(retryStart) { - log.Infof("%s(%d), waiting %s before retrying", api.SectorStates[sector.State], time.Until(retryStart)) + log.Infof("%s(%d), waiting %s before retrying", api.SectorStates[sector.State], sector.SectorID, time.Until(retryStart)) select { case <-time.After(time.Until(retryStart)): case <-ctx.Context().Done(): From 08a2e0f82c6e9e8023bb1007f991dd9bdae801fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 17:02:55 +0100 Subject: [PATCH 19/24] sealing: Errors are hard --- storage/sealing/states.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/storage/sealing/states.go b/storage/sealing/states.go index 81c3549df..519245e03 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -46,12 +46,12 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error { if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state switch err.(type) { - case ErrApi: + case *ErrApi: log.Errorf("handleUnsealed: api error, not proceeding: %+v", err) return nil - case ErrInvalidDeals: + case *ErrInvalidDeals: return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)}) - case ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector? + case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector? return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired deals in sector: %w", err)}) default: return xerrors.Errorf("checkPieces sanity check error: %w", err) @@ -82,12 +82,12 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil { switch err.(type) { - case ErrApi: + case *ErrApi: log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) return nil - case ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too) + case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too) return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) - case ErrExpiredTicket: + case *ErrExpiredTicket: return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) default: return xerrors.Errorf("checkSeal sanity check error: %w", err) From df9dfa9d0c61cc3709be4c69cc039dd017746c78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 17:11:58 +0100 Subject: [PATCH 20/24] sealing: Don't infinite-loop on fatal errors --- storage/sealing/fsm.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index e105a328e..3d3253072 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -21,9 +21,8 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface return func(ctx statemachine.Context, si SectorInfo) error { err := next(ctx, si) if err != nil { - if err := ctx.Send(SectorFatalError{error: err}); err != nil { - return xerrors.Errorf("error while sending error: reporting %+v: %w", err, err) - } + log.Errorf("unhandled sector error (%d): %+v", si.SectorID, err) + return nil } return nil From 7e43c40529dbb141fb8fc4e53bd6e8ec13d679cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 17:15:45 +0100 Subject: [PATCH 21/24] storageminer: Use tabwriter in sectors list --- cmd/lotus-storage-miner/sectors.go | 11 ++++++++--- storage/sealing/checks.go | 10 +++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 9020c9316..e4f76545e 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -2,8 +2,10 @@ package main import ( "fmt" + "os" "sort" "strconv" + "text/tabwriter" "time" "golang.org/x/xerrors" @@ -149,17 +151,19 @@ var sectorsListCmd = &cli.Command{ return list[i] < list[j] }) + w := tabwriter.NewWriter(os.Stdout, 8, 4, 0, ' ', 0) + for _, s := range list { st, err := nodeApi.SectorsStatus(ctx, s) if err != nil { - fmt.Printf("%d:\tError: %s\n", s, err) + fmt.Fprintf(w, "%d:\tError: %s\n", s, err) continue } _, inSSet := commitedIDs[s] _, inPSet := provingIDs[s] - fmt.Printf("%d: %s\tsSet: %s\tpSet: %s\ttktH: %d\tseedH: %d\tdeals: %v\n", + fmt.Fprintf(w, "%d: %s\tsSet: %s\tpSet: %s\ttktH: %d\tseedH: %d\tdeals: %v\n", s, api.SectorStates[st.State], yesno(inSSet), @@ -169,7 +173,8 @@ var sectorsListCmd = &cli.Command{ st.Deals, ) } - return nil + + return w.Flush() }, } diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index 18e48faed..94ad95771 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -14,13 +14,13 @@ import ( // TODO: For now we handle this by halting state execution, when we get jsonrpc reconnecting // We should implement some wait-for-api logic -type ErrApi struct{error} +type ErrApi struct{ error } -type ErrInvalidDeals struct{error} -type ErrExpiredDeals struct{error} +type ErrInvalidDeals struct{ error } +type ErrExpiredDeals struct{ error } -type ErrBadCommD struct{error} -type ErrExpiredTicket struct{error} +type ErrBadCommD struct{ error } +type ErrExpiredTicket struct{ error } func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { head, err := api.ChainHead(ctx) From de52d3cadd131d65a2eae8dfc0480d476f2cd8fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 18:34:04 +0100 Subject: [PATCH 22/24] sealing: Handlef for PreCommitFailed --- cmd/lotus-storage-miner/sectors.go | 2 +- storage/miner.go | 2 +- storage/sealing/fsm.go | 6 ++- storage/sealing/fsm_events.go | 8 +++ storage/sealing/sealing.go | 2 +- storage/sealing/states_failed.go | 84 ++++++++++++++++++++++++++---- 6 files changed, 90 insertions(+), 14 deletions(-) diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index e4f76545e..ad5d504ed 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -151,7 +151,7 @@ var sectorsListCmd = &cli.Command{ return list[i] < list[j] }) - w := tabwriter.NewWriter(os.Stdout, 8, 4, 0, ' ', 0) + w := tabwriter.NewWriter(os.Stdout, 8, 4, 1, ' ', 0) for _, s := range list { st, err := nodeApi.SectorsStatus(ctx, s) diff --git a/storage/miner.go b/storage/miner.go index 3355107da..06eb2c47c 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -50,7 +50,6 @@ type storageMinerApi interface { StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) - StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) @@ -59,6 +58,7 @@ type storageMinerApi interface { ChainGetRandomness(context.Context, types.TipSetKey, int64) ([]byte, error) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) + ChainReadObj(context.Context, cid.Cid) ([]byte, error) WalletSign(context.Context, address.Address, []byte) (*types.Signature, error) WalletBalance(context.Context, address.Address) (types.BigInt, error) diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index 3d3253072..60bab48d0 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -60,6 +60,10 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{ api.SealFailed: planOne( on(SectorRetrySeal{}, api.Unsealed), ), + api.PreCommitFailed: planOne( + on(SectorRetryPreCommit{}, api.PreCommitting), + on(SectorRetryWaitSeed{}, api.WaitSeed), + ), api.Faulty: planOne( on(SectorFaultReported{}, api.FaultReported), @@ -153,7 +157,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta case api.SealFailed: return m.handleSealFailed, nil case api.PreCommitFailed: - log.Warnf("sector %d entered unimplemented state 'PreCommitFailed'", state.SectorID) + return m.handlePreCommitFailed, nil case api.SealCommitFailed: log.Warnf("sector %d entered unimplemented state 'SealCommitFailed'", state.SectorID) case api.CommitFailed: diff --git a/storage/sealing/fsm_events.go b/storage/sealing/fsm_events.go index 948a1653b..ee4963750 100644 --- a/storage/sealing/fsm_events.go +++ b/storage/sealing/fsm_events.go @@ -126,6 +126,14 @@ type SectorRetrySeal struct{} func (evt SectorRetrySeal) apply(state *SectorInfo) {} +type SectorRetryPreCommit struct{} + +func (evt SectorRetryPreCommit) apply(state *SectorInfo) {} + +type SectorRetryWaitSeed struct{} + +func (evt SectorRetryWaitSeed) apply(state *SectorInfo) {} + // Faults type SectorFaulty struct{} diff --git a/storage/sealing/sealing.go b/storage/sealing/sealing.go index 45034047e..6d0c6bb46 100644 --- a/storage/sealing/sealing.go +++ b/storage/sealing/sealing.go @@ -39,7 +39,6 @@ type sealingApi interface { // TODO: trim down StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) - StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) @@ -48,6 +47,7 @@ type sealingApi interface { // TODO: trim down ChainGetRandomness(context.Context, types.TipSetKey, int64) ([]byte, error) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) + ChainReadObj(context.Context, cid.Cid) ([]byte, error) WalletSign(context.Context, address.Address, []byte) (*types.Signature, error) WalletBalance(context.Context, address.Address) (types.BigInt, error) diff --git a/storage/sealing/states_failed.go b/storage/sealing/states_failed.go index 8582471e7..fcef3b30c 100644 --- a/storage/sealing/states_failed.go +++ b/storage/sealing/states_failed.go @@ -1,10 +1,14 @@ package sealing import ( + "bytes" "fmt" "time" + "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/lib/statemachine" ) @@ -24,28 +28,41 @@ func failedCooldown(ctx statemachine.Context, sector SectorInfo) error { return nil } -func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo) error { - // TODO: - +func (m *Sealing) checkPreCommitted(ctx statemachine.Context, sector SectorInfo) (*actors.PreCommittedSector, bool) { act, err := m.api.StateGetActor(ctx.Context(), m.maddr, nil) if err != nil { log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err) - return nil + return nil, true } - st, err := m.api.StateReadState(ctx.Context(), act, nil) + st, err := m.api.ChainReadObj(ctx.Context(), act.Head) if err != nil { log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err) - return nil + return nil, true } - _, found := st.State.(map[string]interface{})["PreCommittedSectors"].(map[string]interface{})[fmt.Sprint(sector.SectorID)] + var state actors.StorageMinerActorState + if err := state.UnmarshalCBOR(bytes.NewReader(st)); err != nil { + log.Errorf("handleSealFailed(%d): temp error: unmarshaling miner state: %+v", sector.SectorID, err) + return nil, true + } + + pci, found := state.PreCommittedSectors[fmt.Sprint(sector.SectorID)] if found { // TODO: If not expired yet, we can just try reusing sealticket - log.Errorf("sector found in miner preseal array: %+v", sector.SectorID, err) - return nil + log.Errorf("sector %d found in miner preseal array: %+v", sector.SectorID, err) + return pci, true + } + + return nil, false +} + +func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo) error { + + if _, is := m.checkPreCommitted(ctx, sector); is { + // TODO: Remove this after we can re-precommit + return nil // noop, for now } - // if err := failedCooldown(ctx, sector); err != nil { return err @@ -53,3 +70,50 @@ func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorRetrySeal{}) } + +func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error { + if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil { + switch err.(type) { + case *ErrApi: + log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err) + return nil + case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too) + return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) + case *ErrExpiredTicket: + return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) + default: + return xerrors.Errorf("checkSeal sanity check error: %w", err) + } + } + + if pci, is := m.checkPreCommitted(ctx, sector); is && pci != nil { + if sector.PreCommitMessage != nil { + log.Warn("sector %d is precommitted on chain, but we don't have precommit message", sector.SectorID) + return nil // TODO: SeedWait needs this currently + } + + if string(pci.Info.CommR) != string(sector.CommR) { + log.Warn("sector %d is precommitted on chain, with different CommR: %x != %x", sector.SectorID, pci.Info.CommR, sector.CommR) + return nil // TODO: remove when the actor allows re-precommit + } + + // TODO: we could compare more things, but I don't think we really need to + // CommR tells us that CommD (and CommPs), and the ticket are all matching + + if err := failedCooldown(ctx, sector); err != nil { + return err + } + + return ctx.Send(SectorRetryWaitSeed{}) + } + + if sector.PreCommitMessage != nil { + log.Warn("retrying precommit even though the message failed to apply") + } + + if err := failedCooldown(ctx, sector); err != nil { + return err + } + + return ctx.Send(SectorRetryPreCommit{}) +} From 6b2b22782b2bf2d9c68f1c5d5ced1f7f8f2345c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 18:45:57 +0100 Subject: [PATCH 23/24] sealing: PreCommitFailed can go to SealFailed --- storage/sealing/fsm.go | 1 + storage/sealing/states_failed.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index 60bab48d0..ad0803488 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -63,6 +63,7 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{ api.PreCommitFailed: planOne( on(SectorRetryPreCommit{}, api.PreCommitting), on(SectorRetryWaitSeed{}, api.WaitSeed), + on(SectorSealFailed{}, api.SealFailed), ), api.Faulty: planOne( diff --git a/storage/sealing/states_failed.go b/storage/sealing/states_failed.go index fcef3b30c..4004edc90 100644 --- a/storage/sealing/states_failed.go +++ b/storage/sealing/states_failed.go @@ -80,7 +80,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too) return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) case *ErrExpiredTicket: - return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) + return ctx.Send(SectorSealFailed{xerrors.Errorf("ticket expired error: %w", err)}) default: return xerrors.Errorf("checkSeal sanity check error: %w", err) } From 2f2e7be12ae4ae1f93123388c1fbe1c4362495e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 24 Jan 2020 21:15:02 +0100 Subject: [PATCH 24/24] sealing: docstrings for sanity-checks --- storage/sealing/checks.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index 94ad95771..9baf2d993 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -22,6 +22,11 @@ type ErrExpiredDeals struct{ error } type ErrBadCommD struct{ error } type ErrExpiredTicket struct{ error } +// checkPieces validates that: +// - Each piece han a corresponding on chain deal +// - Piece commitments match with on chain deals +// - Piece sizes match +// - Deals aren't expired func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { head, err := api.ChainHead(ctx) if err != nil { @@ -50,6 +55,8 @@ func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { return nil } +// checkSeal checks that data commitment generated in the sealing process +// matches pieces, and that the seal ticket isn't expired func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api sealingApi) (err error) { head, err := api.ChainHead(ctx) if err != nil {