pipe sealing events to the journal.

This commit is contained in:
Raúl Kripalani 2020-07-21 13:02:51 +01:00
parent 7459ec6bba
commit d6e6eedd58
7 changed files with 85 additions and 69 deletions

2
go.mod
View File

@ -32,7 +32,7 @@ require (
github.com/filecoin-project/sector-storage v0.0.0-20200717213554-a109ef9cbeab
github.com/filecoin-project/specs-actors v0.7.3-0.20200717200758-365408676dbb
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea
github.com/filecoin-project/storage-fsm v0.0.0-20200717125541-d575c3a5f7f2
github.com/filecoin-project/storage-fsm v0.0.0-20200721113842-ab98dc7ab341
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/go-kit/kit v0.10.0
github.com/go-ole/go-ole v1.2.4 // indirect

4
go.sum
View File

@ -275,8 +275,8 @@ github.com/filecoin-project/specs-storage v0.1.0 h1:PkDgTOT5W5Ao7752onjDl4QSv+sg
github.com/filecoin-project/specs-storage v0.1.0/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
github.com/filecoin-project/storage-fsm v0.0.0-20200717125541-d575c3a5f7f2 h1:A9zUXOMuVnSTp9a0i0KtHkB05hA8mRWVLls6Op9Czuo=
github.com/filecoin-project/storage-fsm v0.0.0-20200717125541-d575c3a5f7f2/go.mod h1:1CGbd11KkHuyWPT+xwwCol1zl/jnlpiKD2L4fzKxaiI=
github.com/filecoin-project/storage-fsm v0.0.0-20200721113842-ab98dc7ab341 h1:4catrjuFXo/VcQcmvAMyeBYEp35IHFi+Tqz0dNAJp54=
github.com/filecoin-project/storage-fsm v0.0.0-20200721113842-ab98dc7ab341/go.mod h1:1CGbd11KkHuyWPT+xwwCol1zl/jnlpiKD2L4fzKxaiI=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNpjmVH56sVtS/RfclBAYocb4as=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ=

View File

@ -188,7 +188,7 @@ func StorageMiner(params StorageMinerParams) (*storage.Miner, error) {
return nil, err
}
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd)
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd, jrnl)
if err != nil {
return nil, err
}

View File

@ -1,34 +0,0 @@
package storage
import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"
)
type WindowPoStEvt struct {
State string
Deadline *miner.DeadlineInfo
Height abi.ChainEpoch
TipSet []cid.Cid
Error error `json:",omitempty"`
Proofs *WindowPoStEvt_Proofs `json:",omitempty"`
Recoveries *WindowPoStEvt_Recoveries `json:",omitempty"`
Faults *WindowPoStEvt_Faults `json:",omitempty"`
}
type WindowPoStEvt_Proofs struct {
Partitions []miner.PoStPartition
MessageCID cid.Cid
}
type WindowPoStEvt_Recoveries struct {
Declarations []miner.RecoveryDeclaration
MessageCID cid.Cid
}
type WindowPoStEvt_Faults struct {
Declarations []miner.FaultDeclaration
MessageCID cid.Cid
}

View File

@ -24,6 +24,7 @@ import (
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/node/modules/dtypes"
sealing "github.com/filecoin-project/storage-fsm"
)
@ -43,6 +44,18 @@ type Miner struct {
getSealDelay dtypes.GetSealingDelayFunc
sealing *sealing.Sealing
jrnl journal.Journal
sealingEvtType journal.EventType
}
// SealingStateEvt is a journal event that records a sector state transition.
type SealingStateEvt struct {
SectorNumber abi.SectorNumber
SectorType abi.RegisteredSealProof
From sealing.SectorState
After sealing.SectorState
Error string
}
type storageMinerApi interface {
@ -80,7 +93,7 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error)
}
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*Miner, error) {
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc, jrnl journal.Journal) (*Miner, error) {
m := &Miner{
api: api,
h: h,
@ -92,6 +105,9 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d
maddr: maddr,
worker: worker,
getSealDelay: gsd,
jrnl: jrnl,
sealingEvtType: jrnl.RegisterEventType("storage", "sealing_states"),
}
return m, nil
@ -110,13 +126,25 @@ func (m *Miner) Run(ctx context.Context) error {
evts := events.NewEvents(ctx, m.api)
adaptedAPI := NewSealingAPIAdapter(m.api)
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod)
m.sealing = sealing.New(adaptedAPI, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingDelayFunc(m.getSealDelay))
m.sealing = sealing.New(adaptedAPI, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingDelayFunc(m.getSealDelay), m.handleSealingNotifications)
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function
return nil
}
func (m *Miner) handleSealingNotifications(before, after sealing.SectorInfo) {
journal.MaybeAddEntry(m.jrnl, m.sealingEvtType, func() interface{} {
return SealingStateEvt{
SectorNumber: before.SectorNumber,
SectorType: before.SectorType,
From: before.State,
After: after.State,
Error: after.LastErr,
}
})
}
func (m *Miner) Stop(ctx context.Context) error {
return m.sealing.Stop(ctx)
}

View File

@ -27,7 +27,7 @@ import (
var errNoPartitions = errors.New("no partitions")
func (s *WindowPoStScheduler) failPost(err error, deadline *miner.DeadlineInfo) {
journal.MaybeAddEntry(s.jrnl, s.evtTypes[evtTypeWindowPoSt], func() interface{} {
journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
return WindowPoStEvt{
State: "failed",
Deadline: s.activeDeadline,
@ -51,7 +51,7 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli
s.abort = abort
s.activeDeadline = deadline
journal.MaybeAddEntry(s.jrnl, s.evtTypes[evtTypeWindowPoSt], func() interface{} {
journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
return WindowPoStEvt{
State: "started",
Deadline: s.activeDeadline,
@ -82,7 +82,7 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli
return
}
journal.MaybeAddEntry(s.jrnl, s.evtTypes[evtTypeWindowPoSt], func() interface{} {
journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
return WindowPoStEvt{
State: "succeeded",
Deadline: s.activeDeadline,
@ -148,7 +148,7 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
Recoveries: []miner.RecoveryDeclaration{},
}
defer journal.MaybeAddEntry(s.jrnl, s.evtTypes[evtTypeWindowPoSt], func() interface{} {
defer journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
var mcid cid.Cid
if sm != nil {
mcid = sm.Cid()
@ -258,7 +258,7 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
Faults: []miner.FaultDeclaration{},
}
defer journal.MaybeAddEntry(s.jrnl, s.evtTypes[evtTypeWindowPoSt], func() interface{} {
defer journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
var mcid cid.Cid
if sm != nil {
mcid = sm.Cid()
@ -519,7 +519,7 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
var sm *types.SignedMessage
defer journal.MaybeAddEntry(s.jrnl, s.evtTypes[evtTypeWindowPoSt], func() interface{} {
defer journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
var mcid cid.Cid
if sm != nil {
mcid = sm.Cid()

View File

@ -4,6 +4,7 @@ import (
"context"
"time"
"github.com/ipfs/go-cid"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
@ -22,10 +23,32 @@ import (
const StartConfidence = 4 // TODO: config
// Journal event types.
const (
evtTypeWindowPoSt = iota
)
type WindowPoStEvt struct {
State string
Deadline *miner.DeadlineInfo
Height abi.ChainEpoch
TipSet []cid.Cid
Error error `json:",omitempty"`
Proofs *WindowPoStEvt_Proofs `json:",omitempty"`
Recoveries *WindowPoStEvt_Recoveries `json:",omitempty"`
Faults *WindowPoStEvt_Faults `json:",omitempty"`
}
type WindowPoStEvt_Proofs struct {
Partitions []miner.PoStPartition
MessageCID cid.Cid `json:",omitempty"`
}
type WindowPoStEvt_Recoveries struct {
Declarations []miner.RecoveryDeclaration
MessageCID cid.Cid `json:",omitempty"`
}
type WindowPoStEvt_Faults struct {
Declarations []miner.FaultDeclaration
MessageCID cid.Cid `json:",omitempty"`
}
type WindowPoStScheduler struct {
api storageMinerApi
@ -42,8 +65,9 @@ type WindowPoStScheduler struct {
// if a post is in progress, this indicates for which ElectionPeriodStart
activeDeadline *miner.DeadlineInfo
abort context.CancelFunc
jrnl journal.Journal
evtTypes [1]journal.EventType
jrnl journal.Journal
wdPoStEvtType journal.EventType
// failed abi.ChainEpoch // eps
// failLk sync.Mutex
@ -67,12 +91,10 @@ func NewWindowedPoStScheduler(api storageMinerApi, sb storage.Prover, ft sectors
proofType: rt,
partitionSectors: mi.WindowPoStPartitionSectors,
actor: actor,
worker: worker,
jrnl: jrnl,
evtTypes: [...]journal.EventType{
evtTypeWindowPoSt: jrnl.RegisterEventType("storage", "wdpost"),
},
actor: actor,
worker: worker,
jrnl: jrnl,
wdPoStEvtType: jrnl.RegisterEventType("storage", "wdpost"),
}, nil
}
@ -224,19 +246,19 @@ func (s *WindowPoStScheduler) abortActivePoSt() {
if s.abort != nil {
s.abort()
journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
return WindowPoStEvt{
State: "abort",
Deadline: s.activeDeadline,
Height: s.cur.Height(),
TipSet: s.cur.Cids(),
}
})
log.Warnf("Aborting Window PoSt (Deadline: %+v)", s.activeDeadline)
}
journal.MaybeAddEntry(s.jrnl, s.evtTypes[evtTypeWindowPoSt], func() interface{} {
return WindowPoStEvt{
State: "abort",
Deadline: s.activeDeadline,
Height: s.cur.Height(),
TipSet: s.cur.Cids(),
}
})
log.Warnf("Aborting Window PoSt (Deadline: %+v)", s.activeDeadline)
s.activeDeadline = nil
s.abort = nil
}