Merge pull request #2455 from filecoin-project/inmem-journal

make journal pluggable; record deals, sealing, wdpost, mempool events
This commit is contained in:
Whyrusleeping 2020-09-14 11:17:21 -07:00 committed by GitHub
commit b212368a70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 932 additions and 232 deletions

View File

@ -27,7 +27,6 @@ type heightEvents struct {
}
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange")
defer span.End()
span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height())))
@ -145,12 +144,11 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
}
// ChainAt invokes the specified `HeightHandler` when the chain reaches the
// specified height+confidence threshold. If the chain is rolled-back under the
// specified height, `RevertHandler` will be called.
// specified height+confidence threshold. If the chain is rolled-back under the
// specified height, `RevertHandler` will be called.
//
// ts passed to handlers is the tipset at the specified, or above, if lower tipsets were null
func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error {
e.lk.Lock() // Tricky locking, check your locks if you modify this function!
best, err := e.tsc.best()

View File

@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/node/modules/dtypes"
@ -83,6 +84,26 @@ const (
localUpdates = "update"
)
// Journal event types.
const (
evtTypeMpoolAdd = iota
evtTypeMpoolRemove
evtTypeMpoolRepub
)
// MessagePoolEvt is the journal entry for message pool events.
type MessagePoolEvt struct {
Action string
Messages []MessagePoolEvtMessage
Error error `json:",omitempty"`
}
type MessagePoolEvtMessage struct {
types.Message
CID cid.Cid
}
// this is *temporary* mutilation until we have implemented uncapped miner penalties -- it will go
// away in the next fork.
var strictBaseFeeValidation = false
@ -140,6 +161,8 @@ type MessagePool struct {
netName dtypes.NetworkName
sigValCache *lru.TwoQueueCache
evtTypes [3]journal.EventType
}
type msgSet struct {
@ -316,6 +339,11 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
api: api,
netName: netName,
cfg: cfg,
evtTypes: [...]journal.EventType{
evtTypeMpoolAdd: journal.J.RegisterEventType("mpool", "add"),
evtTypeMpoolRemove: journal.J.RegisterEventType("mpool", "remove"),
evtTypeMpoolRepub: journal.J.RegisterEventType("mpool", "repub"),
},
}
// enable initial prunes
@ -367,10 +395,12 @@ func (mp *MessagePool) runLoop() {
if err := mp.republishPendingMessages(); err != nil {
log.Errorf("error while republishing messages: %s", err)
}
case <-mp.pruneTrigger:
if err := mp.pruneExcessMessages(); err != nil {
log.Errorf("failed to prune excess messages from mempool: %s", err)
}
case <-mp.closer:
mp.repubTk.Stop()
return
@ -700,6 +730,14 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
Type: api.MpoolAdd,
Message: m,
}, localUpdates)
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolAdd], func() interface{} {
return MessagePoolEvt{
Action: "add",
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}},
}
})
return nil
}
@ -862,6 +900,12 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool)
Message: m,
}, localUpdates)
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRemove], func() interface{} {
return MessagePoolEvt{
Action: "remove",
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}}}
})
mp.currentSize--
}

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/ipfs/go-cid"
)
@ -146,6 +147,19 @@ loop:
}
}
if len(msgs) > 0 {
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
msgs := make([]MessagePoolEvtMessage, 0, len(msgs))
for _, m := range msgs {
msgs = append(msgs, MessagePoolEvtMessage{Message: m.Message, CID: m.Cid()})
}
return MessagePoolEvt{
Action: "repub",
Messages: msgs,
}
})
}
// track most recently republished messages
republished := make(map[cid.Cid]struct{})
for _, m := range msgs[:count] {

View File

@ -72,6 +72,20 @@ func init() {
// ReorgNotifee represents a callback that gets called upon reorgs.
type ReorgNotifee func(rev, app []*types.TipSet) error
// Journal event types.
const (
evtTypeHeadChange = iota
)
type HeadChangeEvt struct {
From types.TipSetKey
FromHeight abi.ChainEpoch
To types.TipSetKey
ToHeight abi.ChainEpoch
RevertCount int
ApplyCount int
}
// ChainStore is the main point of access to chain data.
//
// Raw chain data is stored in the Blockstore, with relevant markers (genesis,
@ -103,6 +117,8 @@ type ChainStore struct {
tsCache *lru.ARCCache
vmcalls vm.SyscallBuilder
evtTypes [1]journal.EventType
}
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder) *ChainStore {
@ -118,6 +134,10 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallB
vmcalls: vmcalls,
}
cs.evtTypes = [1]journal.EventType{
evtTypeHeadChange: journal.J.RegisterEventType("sync", "head_change"),
}
ci := NewChainIndex(cs.LoadTipSet)
cs.cindex = ci
@ -344,12 +364,15 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
continue
}
journal.Add("sync", map[string]interface{}{
"op": "headChange",
"from": r.old.Key(),
"to": r.new.Key(),
"rev": len(revert),
"apply": len(apply),
journal.J.RecordEvent(cs.evtTypes[evtTypeHeadChange], func() interface{} {
return HeadChangeEvt{
From: r.old.Key(),
FromHeight: r.old.Height(),
To: r.new.Key(),
ToHeight: r.new.Height(),
RevertCount: len(revert),
ApplyCount: len(apply),
}
})
// reverse the apply array

View File

@ -44,6 +44,7 @@ import (
lcli "github.com/filecoin-project/lotus/cli"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
@ -459,6 +460,12 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
return err
}
if jrnl, err := journal.OpenFSJournal(lr, journal.DefaultDisabledEvents); err == nil {
journal.J = jrnl
} else {
return fmt.Errorf("failed to open filesystem journal: %w", err)
}
m := miner.NewMiner(api, epp, a, slashfilter.New(mds))
{
if err := m.Start(ctx); err != nil {

View File

@ -189,6 +189,12 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
state.Log = append(state.Log, l)
}
if m.notifee != nil {
defer func(before SectorInfo) {
m.notifee(before, *state)
}(*state) // take safe-ish copy of the before state (except for nested pointers)
}
p := fsmPlanners[state.State]
if p == nil {
return nil, 0, xerrors.Errorf("planner for state %s not found", state.State)

View File

@ -27,6 +27,7 @@ type test struct {
}
func TestHappyPath(t *testing.T) {
var notif []struct{ before, after SectorInfo }
ma, _ := address.NewIDAddress(55151)
m := test{
s: &Sealing{
@ -34,6 +35,9 @@ func TestHappyPath(t *testing.T) {
stats: SectorStats{
bySector: map[abi.SectorID]statSectorState{},
},
notifee: func(before, after SectorInfo) {
notif = append(notif, struct{ before, after SectorInfo }{before, after})
},
},
t: t,
state: &SectorInfo{State: Packing},
@ -68,6 +72,16 @@ func TestHappyPath(t *testing.T) {
m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, Proving)
expected := []SectorState{Packing, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector, Proving}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
}
if n.after.State != expected[i+1] {
t.Fatalf("expected after state: %s, got: %s", expected[i+1], n.after.State)
}
}
}
func TestSeedRevert(t *testing.T) {

View File

@ -61,6 +61,8 @@ type SealingAPI interface {
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
}
type SectorStateNotifee func(before, after SectorInfo)
type Sealing struct {
api SealingAPI
feeCfg FeeConfig
@ -79,6 +81,8 @@ type Sealing struct {
upgradeLk sync.Mutex
toUpgrade map[abi.SectorNumber]struct{}
notifee SectorStateNotifee
stats SectorStats
getConfig GetSealingConfigFunc
@ -101,7 +105,7 @@ type UnsealedSectorInfo struct {
pieceSizes []abi.UnpaddedPieceSize
}
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc) *Sealing {
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee) *Sealing {
s := &Sealing{
api: api,
feeCfg: fc,
@ -118,6 +122,9 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
},
toUpgrade: map[abi.SectorNumber]struct{}{},
notifee: notifee,
getConfig: gc,
stats: SectorStats{

4
go.sum
View File

@ -91,7 +91,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/briandowns/spinner v1.11.1 h1:OixPqDEcX3juo5AjQZAnFPbeUA0jvkp2qzB5gOZJ/L0=
github.com/briandowns/spinner v1.11.1/go.mod h1:QOuQk7x+EaDASo80FEXwlwiA+j/PPIcX3FScO+3/ZPQ=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
@ -241,7 +240,6 @@ github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 h
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-state-types v0.0.0-20200903145444-247639ffa6ad/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200904021452-1883f36ca2f4/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df h1:m2esXSuGBkuXlRyCsl1a/7/FkFam63o1OzIgzaHtOfI=
github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200909080127-001afaca718c h1:HHRMFpU8OrODDUja5NmGWNBAVGoSy4MRjxgZa+a0qIw=
github.com/filecoin-project/go-state-types v0.0.0-20200909080127-001afaca718c/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
@ -253,7 +251,6 @@ github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZO
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8=
github.com/filecoin-project/specs-actors v0.9.4/go.mod h1:BStZQzx5x7TmCkLv0Bpa07U6cPKol6fd3w9KjMPZ6Z4=
github.com/filecoin-project/specs-actors v0.9.7 h1:7PAZ8kdqwBdmgf/23FCkQZLCXcVu02XJrkpkhBikiA8=
github.com/filecoin-project/specs-actors v0.9.7/go.mod h1:wM2z+kwqYgXn5Z7scV1YHLyd1Q1cy0R8HfTIWQ0BFGU=
github.com/filecoin-project/specs-actors v0.9.8 h1:45fnx/BsseFL3CtvSoR6CszFY26TFtsh9AHwCW2vkg8=
github.com/filecoin-project/specs-actors v0.9.8/go.mod h1:xFObDoWPySBNTNBrGXVVrutmgSZH/mMo46Q1bec/0hw=
@ -506,7 +503,6 @@ github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28
github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-graphsync v0.1.2 h1:25Ll9kIXCE+DY0dicvfS3KMw+U5sd01b/FJbA7KAbhg=
github.com/ipfs/go-graphsync v0.1.2/go.mod h1:sLXVXm1OxtE2XYPw62MuXCdAuNwkAdsbnfrmos5odbA=
github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=

136
journal/fs.go Normal file
View File

@ -0,0 +1,136 @@
package journal
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/node/repo"
)
const RFC3339nocolon = "2006-01-02T150405Z0700"
// fsJournal is a basic journal backed by files on a filesystem.
type fsJournal struct {
EventTypeRegistry
dir string
sizeLimit int64
fi *os.File
fSize int64
incoming chan *Event
closing chan struct{}
closed chan struct{}
}
// OpenFSJournal constructs a rolling filesystem journal, with a default
// per-file size limit of 1GiB.
func OpenFSJournal(lr repo.LockedRepo, disabled DisabledEvents) (Journal, error) {
dir := filepath.Join(lr.Path(), "journal")
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err)
}
f := &fsJournal{
EventTypeRegistry: NewEventTypeRegistry(disabled),
dir: dir,
sizeLimit: 1 << 30,
incoming: make(chan *Event, 32),
closing: make(chan struct{}),
closed: make(chan struct{}),
}
if err := f.rollJournalFile(); err != nil {
return nil, err
}
go f.runLoop()
return f, nil
}
func (f *fsJournal) RecordEvent(evtType EventType, supplier func() interface{}) {
defer func() {
if r := recover(); r != nil {
log.Warnf("recovered from panic while recording journal event; type=%s, err=%v", evtType, r)
}
}()
if !evtType.Enabled() {
return
}
je := &Event{
EventType: evtType,
Timestamp: build.Clock.Now(),
Data: supplier(),
}
select {
case f.incoming <- je:
case <-f.closing:
log.Warnw("journal closed but tried to log event", "event", je)
}
}
func (f *fsJournal) Close() error {
close(f.closing)
<-f.closed
return nil
}
func (f *fsJournal) putEvent(evt *Event) error {
b, err := json.Marshal(evt)
if err != nil {
return err
}
n, err := f.fi.Write(append(b, '\n'))
if err != nil {
return err
}
f.fSize += int64(n)
if f.fSize >= f.sizeLimit {
_ = f.rollJournalFile()
}
return nil
}
func (f *fsJournal) rollJournalFile() error {
if f.fi != nil {
_ = f.fi.Close()
}
nfi, err := os.Create(filepath.Join(f.dir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(RFC3339nocolon))))
if err != nil {
return xerrors.Errorf("failed to open journal file: %w", err)
}
f.fi = nfi
f.fSize = 0
return nil
}
func (f *fsJournal) runLoop() {
defer close(f.closed)
for {
select {
case je := <-f.incoming:
if err := f.putEvent(je); err != nil {
log.Errorw("failed to write out journal event", "event", je, "err", err)
}
case <-f.closing:
_ = f.fi.Close()
return
}
}
}

9
journal/global.go Normal file
View File

@ -0,0 +1,9 @@
package journal
var (
// J is a globally accessible Journal. It starts being NilJournal, and early
// during the Lotus initialization routine, it is reset to whichever Journal
// is configured (by default, the filesystem journal). Components can safely
// record in the journal by calling: journal.J.RecordEvent(...).
J Journal = NilJournal() // nolint
)

View File

@ -1,152 +0,0 @@
package journal
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
)
func InitializeSystemJournal(dir string) error {
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
j, err := OpenFSJournal(dir)
if err != nil {
return err
}
currentJournal = j
return nil
}
func Add(sys string, val interface{}) {
if currentJournal == nil {
log.Warn("no journal configured")
return
}
currentJournal.AddEntry(sys, val)
}
var log = logging.Logger("journal")
var currentJournal Journal
type Journal interface {
AddEntry(system string, obj interface{})
Close() error
}
// fsJournal is a basic journal backed by files on a filesystem
type fsJournal struct {
fi *os.File
fSize int64
journalDir string
incoming chan *JournalEntry
journalSizeLimit int64
closing chan struct{}
}
func OpenFSJournal(dir string) (Journal, error) {
fsj := &fsJournal{
journalDir: dir,
incoming: make(chan *JournalEntry, 32),
journalSizeLimit: 1 << 30,
closing: make(chan struct{}),
}
if err := fsj.rollJournalFile(); err != nil {
return nil, err
}
go fsj.runLoop()
return fsj, nil
}
type JournalEntry struct {
System string
Timestamp time.Time
Val interface{}
}
func (fsj *fsJournal) putEntry(je *JournalEntry) error {
b, err := json.Marshal(je)
if err != nil {
return err
}
n, err := fsj.fi.Write(append(b, '\n'))
if err != nil {
return err
}
fsj.fSize += int64(n)
if fsj.fSize >= fsj.journalSizeLimit {
return fsj.rollJournalFile()
}
return nil
}
const RFC3339nocolon = "2006-01-02T150405Z0700"
func (fsj *fsJournal) rollJournalFile() error {
if fsj.fi != nil {
err := fsj.fi.Close()
if err != nil {
return err
}
}
nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(RFC3339nocolon))))
if err != nil {
return xerrors.Errorf("failed to open journal file: %w", err)
}
fsj.fi = nfi
fsj.fSize = 0
return nil
}
func (fsj *fsJournal) runLoop() {
for {
select {
case je := <-fsj.incoming:
if err := fsj.putEntry(je); err != nil {
log.Errorw("failed to write out journal entry", "entry", je, "err", err)
}
case <-fsj.closing:
if err := fsj.fi.Close(); err != nil {
log.Errorw("failed to close journal", "err", err)
}
return
}
}
}
func (fsj *fsJournal) AddEntry(system string, obj interface{}) {
je := &JournalEntry{
System: system,
Timestamp: build.Clock.Now(),
Val: obj,
}
select {
case fsj.incoming <- je:
case <-fsj.closing:
log.Warnw("journal closed but tried to log event", "entry", je)
}
}
func (fsj *fsJournal) Close() error {
close(fsj.closing)
return nil
}

16
journal/nil.go Normal file
View File

@ -0,0 +1,16 @@
package journal
type nilJournal struct{}
// nilj is a singleton nil journal.
var nilj Journal = &nilJournal{}
func NilJournal() Journal {
return nilj
}
func (n *nilJournal) RegisterEventType(_, _ string) EventType { return EventType{} }
func (n *nilJournal) RecordEvent(_ EventType, _ func() interface{}) {}
func (n *nilJournal) Close() error { return nil }

57
journal/registry.go Normal file
View File

@ -0,0 +1,57 @@
package journal
import "sync"
// EventTypeRegistry is a component that constructs tracked EventType tokens,
// for usage with a Journal.
type EventTypeRegistry interface {
// RegisterEventType introduces a new event type to a journal, and
// returns an EventType token that components can later use to check whether
// journalling for that type is enabled/suppressed, and to tag journal
// entries appropriately.
RegisterEventType(system, event string) EventType
}
// eventTypeRegistry is an embeddable mixin that takes care of tracking disabled
// event types, and returning initialized/safe EventTypes when requested.
type eventTypeRegistry struct {
sync.Mutex
m map[string]EventType
}
var _ EventTypeRegistry = (*eventTypeRegistry)(nil)
func NewEventTypeRegistry(disabled DisabledEvents) EventTypeRegistry {
ret := &eventTypeRegistry{
m: make(map[string]EventType, len(disabled)+32), // + extra capacity.
}
for _, et := range disabled {
et.enabled, et.safe = false, true
ret.m[et.System+":"+et.Event] = et
}
return ret
}
func (d *eventTypeRegistry) RegisterEventType(system, event string) EventType {
d.Lock()
defer d.Unlock()
key := system + ":" + event
if et, ok := d.m[key]; ok {
return et
}
et := EventType{
System: system,
Event: event,
enabled: true,
safe: true,
}
d.m[key] = et
return et
}

49
journal/registry_test.go Normal file
View File

@ -0,0 +1,49 @@
package journal
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestDisabledEvents(t *testing.T) {
req := require.New(t)
test := func(dis DisabledEvents) func(*testing.T) {
return func(t *testing.T) {
registry := NewEventTypeRegistry(dis)
reg1 := registry.RegisterEventType("system1", "disabled1")
reg2 := registry.RegisterEventType("system2", "disabled2")
req.False(reg1.Enabled())
req.False(reg2.Enabled())
req.True(reg1.safe)
req.True(reg2.safe)
reg3 := registry.RegisterEventType("system3", "enabled3")
req.True(reg3.Enabled())
req.True(reg3.safe)
}
}
t.Run("direct", test(DisabledEvents{
EventType{System: "system1", Event: "disabled1"},
EventType{System: "system2", Event: "disabled2"},
}))
dis, err := ParseDisabledEvents("system1:disabled1,system2:disabled2")
req.NoError(err)
t.Run("parsed", test(dis))
dis, err = ParseDisabledEvents(" system1:disabled1 , system2:disabled2 ")
req.NoError(err)
t.Run("parsed_spaces", test(dis))
}
func TestParseDisableEvents(t *testing.T) {
_, err := ParseDisabledEvents("system1:disabled1:failed,system2:disabled2")
require.Error(t, err)
}

102
journal/types.go Normal file
View File

@ -0,0 +1,102 @@
package journal
import (
"fmt"
"strings"
"time"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("journal")
var (
// DefaultDisabledEvents lists the journal events disabled by
// default, usually because they are considered noisy.
DefaultDisabledEvents = DisabledEvents{
EventType{System: "mpool", Event: "add"},
EventType{System: "mpool", Event: "remove"},
}
)
// DisabledEvents is the set of event types whose journaling is suppressed.
type DisabledEvents []EventType
// ParseDisabledEvents parses a string of the form: "system1:event1,system2:event2[,...]"
// into a DisabledEvents object, returning an error if the string failed to parse.
//
// It sanitizes strings via strings.TrimSpace.
func ParseDisabledEvents(s string) (DisabledEvents, error) {
s = strings.TrimSpace(s) // sanitize
evts := strings.Split(s, ",")
ret := make(DisabledEvents, 0, len(evts))
for _, evt := range evts {
evt = strings.TrimSpace(evt) // sanitize
s := strings.Split(evt, ":")
if len(s) != 2 {
return nil, fmt.Errorf("invalid event type: %s", s)
}
ret = append(ret, EventType{System: s[0], Event: s[1]})
}
return ret, nil
}
// EventType represents the signature of an event.
type EventType struct {
System string
Event string
// enabled stores whether this event type is enabled.
enabled bool
// safe is a sentinel marker that's set to true if this EventType was
// constructed correctly (via Journal#RegisterEventType).
safe bool
}
func (et EventType) String() string {
return et.System + ":" + et.Event
}
// Enabled returns whether this event type is enabled in the journaling
// subsystem. Users are advised to check this before actually attempting to
// add a journal entry, as it helps bypass object construction for events that
// would be discarded anyway.
//
// All event types are enabled by default, and specific event types can only
// be disabled at Journal construction time.
func (et EventType) Enabled() bool {
return et.safe && et.enabled
}
// Journal represents an audit trail of system actions.
//
// Every entry is tagged with a timestamp, a system name, and an event name.
// The supplied data can be any type, as long as it is JSON serializable,
// including structs, map[string]interface{}, or primitive types.
//
// For cleanliness and type safety, we recommend to use typed events. See the
// *Evt struct types in this package for more info.
type Journal interface {
EventTypeRegistry
// RecordEvent records this event to the journal, if and only if the
// EventType is enabled. If so, it calls the supplier function to obtain
// the payload to record.
//
// Implementations MUST recover from panics raised by the supplier function.
RecordEvent(evtType EventType, supplier func() interface{})
// Close closes this journal for further writing.
Close() error
}
// Event represents a journal entry.
//
// See godocs on Journal for more information.
type Event struct {
EventType
Timestamp time.Time
Data interface{}
}

76
markets/journal.go Normal file
View File

@ -0,0 +1,76 @@
package markets
import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/journal"
)
type StorageClientEvt struct {
Event string
Deal storagemarket.ClientDeal
}
type StorageProviderEvt struct {
Event string
Deal storagemarket.MinerDeal
}
type RetrievalClientEvt struct {
Event string
Deal retrievalmarket.ClientDealState
}
type RetrievalProviderEvt struct {
Event string
Deal retrievalmarket.ProviderDealState
}
// StorageClientJournaler records journal events from the storage client.
func StorageClientJournaler(evtType journal.EventType) func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
return func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
journal.J.RecordEvent(evtType, func() interface{} {
return StorageClientEvt{
Event: storagemarket.ClientEvents[event],
Deal: deal,
}
})
}
}
// StorageProviderJournaler records journal events from the storage provider.
func StorageProviderJournaler(evtType journal.EventType) func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
return func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
journal.J.RecordEvent(evtType, func() interface{} {
return StorageProviderEvt{
Event: storagemarket.ProviderEvents[event],
Deal: deal,
}
})
}
}
// RetrievalClientJournaler records journal events from the retrieval client.
func RetrievalClientJournaler(evtType journal.EventType) func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
return func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
journal.J.RecordEvent(evtType, func() interface{} {
return RetrievalClientEvt{
Event: retrievalmarket.ClientEvents[event],
Deal: deal,
}
})
}
}
// RetrievalProviderJournaler records journal events from the retrieval provider.
func RetrievalProviderJournaler(evtType journal.EventType) func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
return func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
journal.J.RecordEvent(evtType, func() interface{} {
return RetrievalProviderEvt{
Event: retrievalmarket.ProviderEvents[event],
Deal: deal,
}
})
}
}

View File

@ -33,6 +33,11 @@ import (
var log = logging.Logger("miner")
// Journal event types.
const (
evtTypeBlockMined = iota
)
// returns a callback reporting whether we mined a blocks in this round
type waitFunc func(ctx context.Context, baseTime uint64) (func(bool, abi.ChainEpoch, error), abi.ChainEpoch, error)
@ -68,6 +73,9 @@ func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address,
sf: sf,
minedBlockHeights: arc,
evtTypes: [...]journal.EventType{
evtTypeBlockMined: journal.J.RegisterEventType("miner", "block_mined"),
},
}
}
@ -87,6 +95,8 @@ type Miner struct {
sf *slashfilter.SlashFilter
minedBlockHeights *lru.ARCCache
evtTypes [1]journal.EventType
}
func (m *Miner) Address() address.Address {
@ -220,12 +230,14 @@ func (m *Miner) mine(ctx context.Context) {
onDone(b != nil, h, nil)
if b != nil {
journal.Add("blockMined", map[string]interface{}{
"parents": base.TipSet.Cids(),
"nulls": base.NullRounds,
"epoch": b.Header.Height,
"timestamp": b.Header.Timestamp,
"cid": b.Header.Cid(),
journal.J.RecordEvent(m.evtTypes[evtTypeBlockMined], func() interface{} {
return map[string]interface{}{
"parents": base.TipSet.Cids(),
"nulls": base.NullRounds,
"epoch": b.Header.Height,
"timestamp": b.Header.Timestamp,
"cid": b.Header.Cid(),
}
})
btime := time.Unix(int64(b.Header.Timestamp), 0)

View File

@ -3,6 +3,7 @@ package node
import (
"context"
"errors"
"os"
"time"
logging "github.com/ipfs/go-log"
@ -44,6 +45,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/lib/peermgr"
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
@ -67,6 +69,10 @@ import (
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
// EnvJournalDisabledEvents is the environment variable through which disabled
// journal events can be customized.
const EnvJournalDisabledEvents = "LOTUS_JOURNAL_DISABLED_EVENTS"
//nolint:deadcode,varcheck
var log = logging.Logger("builder")
@ -91,11 +97,16 @@ var (
type invoke int
// Invokes are called in the order they are defined.
//nolint:golint
const (
// InitJournal at position 0 initializes the journal global var as soon as
// the system starts, so that it's available for all other components.
InitJournalKey = invoke(iota)
// libp2p
PstoreAddSelfKeysKey = invoke(iota)
PstoreAddSelfKeysKey
StartListeningKey
BootstrapKey
@ -123,7 +134,6 @@ const (
HeadMetricsKey
SettlePaymentChannelsKey
RunPeerTaggerKey
JournalKey
SetApiEndpointKey
@ -151,11 +161,25 @@ type Settings struct {
func defaults() []Option {
return []Option{
// global system journal.
Override(new(journal.DisabledEvents), func() journal.DisabledEvents {
if env, ok := os.LookupEnv(EnvJournalDisabledEvents); ok {
if ret, err := journal.ParseDisabledEvents(env); err == nil {
return ret
}
}
// fallback if env variable is not set, or if it failed to parse.
return journal.DefaultDisabledEvents
}),
Override(new(journal.Journal), modules.OpenFilesystemJournal),
Override(InitJournalKey, func(j journal.Journal) {
journal.J = j // eagerly sets the global journal through fx.Invoke.
}),
Override(new(helpers.MetricsCtx), context.Background),
Override(new(record.Validator), modules.RecordValidator),
Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)),
Override(new(dtypes.ShutdownChan), make(chan struct{})),
Override(JournalKey, modules.SetupJournal),
// Filecoin modules

View File

@ -26,7 +26,9 @@ import (
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/host"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/markets"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/markets/retrievaladapter"
"github.com/filecoin-project/lotus/node/impl/full"
@ -119,6 +121,10 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, md
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
c.SubscribeToEvents(marketevents.StorageClientLogger)
evtType := journal.J.RegisterEventType("markets/storage/client", "state_change")
c.SubscribeToEvents(markets.StorageClientJournaler(evtType))
return c.Start(ctx)
},
OnStop: func(context.Context) error {
@ -140,6 +146,10 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore,
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
evtType := journal.J.RegisterEventType("markets/retrieval/client", "state_change")
client.SubscribeToEvents(markets.RetrievalClientJournaler(evtType))
return nil
},
})

View File

@ -6,7 +6,6 @@ import (
"errors"
"io"
"io/ioutil"
"path/filepath"
"github.com/gbrlsnchs/jwt/v3"
logging "github.com/ipfs/go-log/v2"
@ -20,7 +19,6 @@ import (
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/addrutil"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
@ -107,7 +105,3 @@ func DrandBootstrap(ds dtypes.DrandSchedule) (dtypes.DrandBootstrap, error) {
}
return res, nil
}
func SetupJournal(lr repo.LockedRepo) error {
return journal.InitializeSystemJournal(filepath.Join(lr.Path(), "journal"))
}

View File

@ -1,6 +1,8 @@
package modules
import (
"context"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
eventbus "github.com/libp2p/go-eventbus"
@ -22,10 +24,12 @@ import (
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/sub"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/peermgr"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
)
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
@ -150,3 +154,16 @@ func RandomSchedule(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Sche
return shd, nil
}
func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
jrnl, err := journal.OpenFSJournal(lr, disabled)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error { return jrnl.Close() },
})
return jrnl, err
}

View File

@ -49,6 +49,8 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/markets"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
@ -142,8 +144,34 @@ func SectorIDCounter(ds dtypes.MetadataDS) sealing.SectorIDCounter {
return &sidsc{sc}
}
func StorageMiner(fc config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) {
type StorageMinerParams struct {
fx.In
Lifecycle fx.Lifecycle
MetricsCtx helpers.MetricsCtx
API lapi.FullNode
Host host.Host
MetadataDS dtypes.MetadataDS
Sealer sectorstorage.SectorManager
SectorIDCounter sealing.SectorIDCounter
Verifier ffiwrapper.Verifier
GetSealingConfigFn dtypes.GetSealingConfigFunc
}
func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.Miner, error) {
return func(params StorageMinerParams) (*storage.Miner, error) {
var (
ds = params.MetadataDS
mctx = params.MetricsCtx
lc = params.Lifecycle
api = params.API
sealer = params.Sealer
h = params.Host
sc = params.SectorIDCounter
verif = params.Verifier
gsd = params.GetSealingConfigFn
)
maddr, err := minerAddrFromDS(ds)
if err != nil {
return nil, err
@ -187,6 +215,10 @@ func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.Retrieva
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
m.SubscribeToEvents(marketevents.RetrievalProviderLogger)
evtType := journal.J.RegisterEventType("markets/retrieval/provider", "state_change")
m.SubscribeToEvents(markets.RetrievalProviderJournaler(evtType))
return m.Start()
},
OnStop: func(context.Context) error {
@ -201,6 +233,10 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h sto
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
h.SubscribeToEvents(marketevents.StorageProviderLogger)
evtType := journal.J.RegisterEventType("markets/storage/provider", "state_change")
h.SubscribeToEvents(markets.StorageProviderJournaler(evtType))
return h.Start(ctx)
},
OnStop: func(context.Context) error {

View File

@ -30,6 +30,7 @@ import (
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
@ -50,6 +51,17 @@ type Miner struct {
getSealConfig dtypes.GetSealingConfigFunc
sealing *sealing.Sealing
sealingEvtType journal.EventType
}
// SealingStateEvt is a journal event that records a sector state transition.
type SealingStateEvt struct {
SectorNumber abi.SectorNumber
SectorType abi.RegisteredSealProof
From sealing.SectorState
After sealing.SectorState
Error string
}
type storageMinerApi interface {
@ -103,9 +115,10 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d
sc: sc,
verif: verif,
maddr: maddr,
worker: worker,
getSealConfig: gsd,
maddr: maddr,
worker: worker,
getSealConfig: gsd,
sealingEvtType: journal.J.RegisterEventType("storage", "sealing_states"),
}
return m, nil
@ -129,13 +142,25 @@ func (m *Miner) Run(ctx context.Context) error {
evts := events.NewEvents(ctx, m.api)
adaptedAPI := NewSealingAPIAdapter(m.api)
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod)
m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig))
m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig), m.handleSealingNotifications)
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function
return nil
}
func (m *Miner) handleSealingNotifications(before, after sealing.SectorInfo) {
journal.J.RecordEvent(m.sealingEvtType, func() interface{} {
return SealingStateEvt{
SectorNumber: before.SectorNumber,
SectorType: before.SectorType,
From: before.State,
After: after.State,
Error: after.LastErr,
}
})
}
func (m *Miner) Stop(ctx context.Context) error {
return m.sealing.Stop(ctx)
}

75
storage/wdpost_journal.go Normal file
View File

@ -0,0 +1,75 @@
package storage
import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"
)
// SchedulerState defines the possible states in which the scheduler could be,
// for the purposes of journalling.
type SchedulerState string
const (
// SchedulerStateStarted gets recorded when a WdPoSt cycle for an
// epoch begins.
SchedulerStateStarted = SchedulerState("started")
// SchedulerStateAborted gets recorded when a WdPoSt cycle for an
// epoch is aborted, normally because of a chain reorg or advancement.
SchedulerStateAborted = SchedulerState("aborted")
// SchedulerStateFaulted gets recorded when a WdPoSt cycle for an
// epoch terminates abnormally, in which case the error is also recorded.
SchedulerStateFaulted = SchedulerState("faulted")
// SchedulerStateSucceeded gets recorded when a WdPoSt cycle for an
// epoch ends successfully.
SchedulerStateSucceeded = SchedulerState("succeeded")
)
// Journal event types.
const (
evtTypeWdPoStScheduler = iota
evtTypeWdPoStProofs
evtTypeWdPoStRecoveries
evtTypeWdPoStFaults
)
// evtCommon is a common set of attributes for Windowed PoSt journal events.
type evtCommon struct {
Deadline *dline.Info
Height abi.ChainEpoch
TipSet []cid.Cid
Error error `json:",omitempty"`
}
// WdPoStSchedulerEvt is the journal event that gets recorded on scheduler
// actions.
type WdPoStSchedulerEvt struct {
evtCommon
State SchedulerState
}
// WdPoStProofsProcessedEvt is the journal event that gets recorded when
// Windowed PoSt proofs have been processed.
type WdPoStProofsProcessedEvt struct {
evtCommon
Partitions []miner.PoStPartition
MessageCID cid.Cid `json:",omitempty"`
}
// WdPoStRecoveriesProcessedEvt is the journal event that gets recorded when
// Windowed PoSt recoveries have been processed.
type WdPoStRecoveriesProcessedEvt struct {
evtCommon
Declarations []miner.RecoveryDeclaration
MessageCID cid.Cid `json:",omitempty"`
}
// WdPoStFaultsProcessedEvt is the journal event that gets recorded when
// Windowed PoSt faults have been processed.
type WdPoStFaultsProcessedEvt struct {
evtCommon
Declarations []miner.FaultDeclaration
MessageCID cid.Cid `json:",omitempty"`
}

View File

@ -18,6 +18,8 @@ import (
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
@ -25,11 +27,19 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
)
var errNoPartitions = errors.New("no partitions")
func (s *WindowPoStScheduler) failPost(deadline *dline.Info) {
func (s *WindowPoStScheduler) failPost(err error, deadline *dline.Info) {
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return WdPoStSchedulerEvt{
evtCommon: s.getEvtCommon(err),
State: SchedulerStateFaulted,
}
})
log.Errorf("TODO")
/*s.failLk.Lock()
if eps > s.failed {
@ -44,27 +54,56 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *dline.Info,
s.abort = abort
s.activeDeadline = deadline
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return WdPoStSchedulerEvt{
evtCommon: s.getEvtCommon(nil),
State: SchedulerStateStarted,
}
})
go func() {
defer abort()
ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.doPost")
defer span.End()
// recordProofsEvent records a successful proofs_processed event in the
// journal, even if it was a noop (no partitions).
recordProofsEvent := func(partitions []miner.PoStPartition, mcid cid.Cid) {
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} {
return &WdPoStProofsProcessedEvt{
evtCommon: s.getEvtCommon(nil),
Partitions: partitions,
MessageCID: mcid,
}
})
}
proof, err := s.runPost(ctx, *deadline, ts)
switch err {
case errNoPartitions:
recordProofsEvent(nil, cid.Undef)
return
case nil:
if err := s.submitPost(ctx, proof); err != nil {
sm, err := s.submitPost(ctx, proof)
if err != nil {
log.Errorf("submitPost failed: %+v", err)
s.failPost(deadline)
s.failPost(err, deadline)
return
}
recordProofsEvent(proof.Partitions, sm.Cid())
default:
log.Errorf("runPost failed: %+v", err)
s.failPost(deadline)
s.failPost(err, deadline)
return
}
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return WdPoStSchedulerEvt{
evtCommon: s.getEvtCommon(nil),
State: SchedulerStateSucceeded,
}
})
}()
}
@ -113,25 +152,24 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
return sbf, nil
}
func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uint64, partitions []*miner.Partition) error {
func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uint64, partitions []*miner.Partition) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.checkNextRecoveries")
defer span.End()
faulty := uint64(0)
params := &miner.DeclareFaultsRecoveredParams{
Recoveries: []miner.RecoveryDeclaration{},
}
faulty := uint64(0)
for partIdx, partition := range partitions {
unrecovered, err := bitfield.SubtractBitField(partition.Faults, partition.Recoveries)
if err != nil {
return xerrors.Errorf("subtracting recovered set from fault set: %w", err)
return nil, nil, xerrors.Errorf("subtracting recovered set from fault set: %w", err)
}
uc, err := unrecovered.Count()
if err != nil {
return xerrors.Errorf("counting unrecovered sectors: %w", err)
return nil, nil, xerrors.Errorf("counting unrecovered sectors: %w", err)
}
if uc == 0 {
@ -142,13 +180,13 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
recovered, err := s.checkSectors(ctx, unrecovered)
if err != nil {
return xerrors.Errorf("checking unrecovered sectors: %w", err)
return nil, nil, xerrors.Errorf("checking unrecovered sectors: %w", err)
}
// if all sectors failed to recover, don't declare recoveries
recoveredCount, err := recovered.Count()
if err != nil {
return xerrors.Errorf("counting recovered sectors: %w", err)
return nil, nil, xerrors.Errorf("counting recovered sectors: %w", err)
}
if recoveredCount == 0 {
@ -162,17 +200,18 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
})
}
if len(params.Recoveries) == 0 {
recoveries := params.Recoveries
if len(recoveries) == 0 {
if faulty != 0 {
log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty)
}
return nil
return recoveries, nil, nil
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
}
msg := &types.Message{
@ -187,52 +226,51 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence)
if err != nil {
return xerrors.Errorf("declare faults recovered wait error: %w", err)
return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err)
}
if rec.Receipt.ExitCode != 0 {
return xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
}
return nil
return recoveries, sm, nil
}
func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, partitions []*miner.Partition) error {
func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, partitions []*miner.Partition) ([]miner.FaultDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.checkNextFaults")
defer span.End()
bad := uint64(0)
params := &miner.DeclareFaultsParams{
Faults: []miner.FaultDeclaration{},
}
bad := uint64(0)
for partIdx, partition := range partitions {
toCheck, err := partition.ActiveSectors()
if err != nil {
return xerrors.Errorf("getting active sectors: %w", err)
return nil, nil, xerrors.Errorf("getting active sectors: %w", err)
}
good, err := s.checkSectors(ctx, toCheck)
if err != nil {
return xerrors.Errorf("checking sectors: %w", err)
return nil, nil, xerrors.Errorf("checking sectors: %w", err)
}
faulty, err := bitfield.SubtractBitField(toCheck, good)
if err != nil {
return xerrors.Errorf("calculating faulty sector set: %w", err)
return nil, nil, xerrors.Errorf("calculating faulty sector set: %w", err)
}
c, err := faulty.Count()
if err != nil {
return xerrors.Errorf("counting faulty sectors: %w", err)
return nil, nil, xerrors.Errorf("counting faulty sectors: %w", err)
}
if c == 0 {
@ -248,15 +286,16 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
})
}
if len(params.Faults) == 0 {
return nil
faults := params.Faults
if len(faults) == 0 {
return faults, nil, nil
}
log.Errorw("DETECTED FAULTY SECTORS, declaring faults", "count", bad)
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return xerrors.Errorf("could not serialize declare faults parameters: %w", aerr)
return faults, nil, xerrors.Errorf("could not serialize declare faults parameters: %w", aerr)
}
msg := &types.Message{
@ -271,21 +310,21 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
sm, err := s.api.MpoolPushMessage(ctx, msg, spec)
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
return faults, sm, xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Warnw("declare faults Message CID", "cid", sm.Cid())
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence)
if err != nil {
return xerrors.Errorf("declare faults wait error: %w", err)
return faults, sm, xerrors.Errorf("declare faults wait error: %w", err)
}
if rec.Receipt.ExitCode != 0 {
return xerrors.Errorf("declare faults wait non-0 exit code: %d", rec.Receipt.ExitCode)
return faults, sm, xerrors.Errorf("declare faults wait non-0 exit code: %d", rec.Receipt.ExitCode)
}
return nil
return faults, sm, nil
}
func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) (*miner.SubmitWindowedPoStParams, error) {
@ -305,15 +344,49 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
return
}
if err := s.checkNextRecoveries(context.TODO(), declDeadline, partitions); err != nil {
var (
sigmsg *types.SignedMessage
recoveries []miner.RecoveryDeclaration
faults []miner.FaultDeclaration
// optionalCid returns the CID of the message, or cid.Undef is the
// message is nil. We don't need the argument (could capture the
// pointer), but it's clearer and purer like that.
optionalCid = func(sigmsg *types.SignedMessage) cid.Cid {
if sigmsg == nil {
return cid.Undef
}
return sigmsg.Cid()
}
)
if recoveries, sigmsg, err = s.checkNextRecoveries(context.TODO(), declDeadline, partitions); err != nil {
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
log.Errorf("checking sector recoveries: %v", err)
}
if err := s.checkNextFaults(context.TODO(), declDeadline, partitions); err != nil {
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
j := WdPoStRecoveriesProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: recoveries,
MessageCID: optionalCid(sigmsg),
}
j.Error = err
return j
})
if faults, sigmsg, err = s.checkNextFaults(context.TODO(), declDeadline, partitions); err != nil {
// TODO: This is also potentially really bad, but we try to post anyways
log.Errorf("checking sector faults: %v", err)
}
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
return WdPoStFaultsProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: faults,
MessageCID: optionalCid(sigmsg),
}
})
}()
buf := new(bytes.Buffer)
@ -498,13 +571,15 @@ func (s *WindowPoStScheduler) sectorsForProof(ctx context.Context, goodSectors,
return proofSectors, nil
}
func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.SubmitWindowedPoStParams) error {
func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.SubmitWindowedPoStParams) (*types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.commitPost")
defer span.End()
var sm *types.SignedMessage
enc, aerr := actors.SerializeParams(proof)
if aerr != nil {
return xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
return nil, xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
}
msg := &types.Message{
@ -519,8 +594,9 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
// TODO: consider maybe caring about the output
sm, err := s.api.MpoolPushMessage(ctx, msg, spec)
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
return nil, xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Infof("Submitted window post: %s", sm.Cid())
@ -539,7 +615,7 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
log.Errorf("Submitting window post %s failed: exit %d", sm.Cid(), rec.Receipt.ExitCode)
}()
return nil
return sm, nil
}
func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) {

View File

@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/node/config"
"go.opencensus.io/trace"
@ -41,8 +42,10 @@ type WindowPoStScheduler struct {
activeDeadline *dline.Info
abort context.CancelFunc
//failed abi.ChainEpoch // eps
//failLk sync.Mutex
evtTypes [4]journal.EventType
// failed abi.ChainEpoch // eps
// failLk sync.Mutex
}
func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb storage.Prover, ft sectorstorage.FaultTracker, actor address.Address, worker address.Address) (*WindowPoStScheduler, error) {
@ -66,6 +69,12 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb
actor: actor,
worker: worker,
evtTypes: [...]journal.EventType{
evtTypeWdPoStScheduler: journal.J.RegisterEventType("wdpost", "scheduler"),
evtTypeWdPoStProofs: journal.J.RegisterEventType("wdpost", "proofs_processed"),
evtTypeWdPoStRecoveries: journal.J.RegisterEventType("wdpost", "recoveries_processed"),
evtTypeWdPoStFaults: journal.J.RegisterEventType("wdpost", "faults_processed"),
},
}, nil
}
@ -111,12 +120,13 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
log.Errorf("expected first notif to have len = 1")
continue
}
if changes[0].Type != store.HCCurrent {
chg := changes[0]
if chg.Type != store.HCCurrent {
log.Errorf("expected first notif to tell current ts")
continue
}
if err := s.update(ctx, changes[0].Val); err != nil {
if err := s.update(ctx, chg.Val); err != nil {
log.Errorf("%+v", err)
}
@ -220,10 +230,29 @@ func (s *WindowPoStScheduler) abortActivePoSt() {
if s.abort != nil {
s.abort()
}
log.Warnf("Aborting Window PoSt (Deadline: %+v)", s.activeDeadline)
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return WdPoStSchedulerEvt{
evtCommon: s.getEvtCommon(nil),
State: SchedulerStateAborted,
}
})
log.Warnf("Aborting Window PoSt (Deadline: %+v)", s.activeDeadline)
}
s.activeDeadline = nil
s.abort = nil
}
// getEvtCommon populates and returns common attributes from state, for a
// WdPoSt journal event.
func (s *WindowPoStScheduler) getEvtCommon(err error) evtCommon {
c := evtCommon{Error: err}
if s.cur != nil {
c.Deadline = s.activeDeadline
c.Height = s.cur.Height()
c.TipSet = s.cur.Cids()
}
return c
}