From 2ea5abdfb5cbba97d5c935a3ad3f292287e760cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 11 Aug 2020 13:48:32 +0100 Subject: [PATCH] wire journal into miner. --- cmd/lotus-storage-miner/init.go | 15 +++++++++++---- journal/fs.go | 13 ++++++------- journal/types.go | 9 +++++++++ miner/journal_events.go | 1 + miner/miner.go | 28 +++++++++++++++++++++------- node/builder.go | 9 ++------- node/modules/services.go | 17 +++++++++++++++++ node/modules/storageminer.go | 4 ++-- 8 files changed, 69 insertions(+), 27 deletions(-) create mode 100644 miner/journal_events.go diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 5007a7120..1714bd901 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -24,9 +24,6 @@ import ( "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" paramfetch "github.com/filecoin-project/go-paramfetch" - sectorstorage "github.com/filecoin-project/sector-storage" - "github.com/filecoin-project/sector-storage/ffiwrapper" - "github.com/filecoin-project/sector-storage/stores" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/market" @@ -34,6 +31,11 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin/power" crypto2 "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/lotus/journal" + sectorstorage "github.com/filecoin-project/sector-storage" + "github.com/filecoin-project/sector-storage/ffiwrapper" + "github.com/filecoin-project/sector-storage/stores" + lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" @@ -446,7 +448,12 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, return err } - m := miner.NewMiner(api, epp, a, slashfilter.New(mds)) + jrnl, err := journal.OpenFSJournal(lr, journal.DefaultDisabledEvents) + if err != nil { + return fmt.Errorf("failed to open filesystem journal: %w", err) + } + + m := miner.NewMiner(api, epp, a, slashfilter.New(mds), jrnl) { if err := m.Start(ctx); err != nil { return xerrors.Errorf("failed to start up genesis miner: %w", err) diff --git a/journal/fs.go b/journal/fs.go index 096f5f1fd..91b0d1e6d 100644 --- a/journal/fs.go +++ b/journal/fs.go @@ -1,7 +1,6 @@ package journal import ( - "context" "encoding/json" "fmt" "os" @@ -10,7 +9,6 @@ import ( "time" logging "github.com/ipfs/go-log" - "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/build" @@ -33,11 +31,12 @@ type fsJournal struct { incoming chan *Event closing chan struct{} + closed chan struct{} } // OpenFSJournal constructs a rolling filesystem journal, with a default // per-file size limit of 1GiB. -func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled DisabledEvents) (Journal, error) { +func OpenFSJournal(lr repo.LockedRepo, disabled DisabledEvents) (Journal, error) { dir := filepath.Join(lr.Path(), "journal") if err := os.MkdirAll(dir, 0755); err != nil { return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err) @@ -49,16 +48,13 @@ func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled DisabledEvents) sizeLimit: 1 << 30, incoming: make(chan *Event, 32), closing: make(chan struct{}), + closed: make(chan struct{}), } if err := f.rollJournalFile(); err != nil { return nil, err } - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { return f.Close() }, - }) - go f.runLoop() return f, nil @@ -79,6 +75,7 @@ func (f *fsJournal) RecordEvent(evtType EventType, obj interface{}) { func (f *fsJournal) Close() error { close(f.closing) + <-f.closed return nil } @@ -117,6 +114,8 @@ func (f *fsJournal) rollJournalFile() error { } func (f *fsJournal) runLoop() { + defer close(f.closed) + for { select { case je := <-f.incoming: diff --git a/journal/types.go b/journal/types.go index 29954d9cf..6b6d53c08 100644 --- a/journal/types.go +++ b/journal/types.go @@ -5,6 +5,15 @@ import ( "time" ) +var ( + // DefaultDisabledEvents lists the journal events disabled by + // default, usually because they are considered noisy. + DefaultDisabledEvents = DisabledEvents{ + EventType{System: "mpool", Event: "add"}, + EventType{System: "mpool", Event: "remove"}, + } +) + // DisabledEvents is the set of event types whose journaling is suppressed. type DisabledEvents []EventType diff --git a/miner/journal_events.go b/miner/journal_events.go new file mode 100644 index 000000000..ab865910b --- /dev/null +++ b/miner/journal_events.go @@ -0,0 +1 @@ +package miner diff --git a/miner/miner.go b/miner/miner.go index b8bb9e562..091919b6c 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -31,6 +31,11 @@ import ( var log = logging.Logger("miner") +// Journal event types. +const ( + evtTypeBlockMined = iota +) + // returns a callback reporting whether we mined a blocks in this round type waitFunc func(ctx context.Context, baseTime uint64) (func(bool, error), abi.ChainEpoch, error) @@ -42,7 +47,7 @@ func randTimeOffset(width time.Duration) time.Duration { return val - (width / 2) } -func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf *slashfilter.SlashFilter) *Miner { +func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf *slashfilter.SlashFilter, jrnl journal.Journal) *Miner { arc, err := lru.NewARC(10000) if err != nil { panic(err) @@ -66,6 +71,10 @@ func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf: sf, minedBlockHeights: arc, + jrnl: jrnl, + evtTypes: [...]journal.EventType{ + evtTypeBlockMined: jrnl.RegisterEventType("miner", "block_mined"), + }, } } @@ -85,6 +94,9 @@ type Miner struct { sf *slashfilter.SlashFilter minedBlockHeights *lru.ARCCache + + jrnl journal.Journal + evtTypes [1]journal.EventType } func (m *Miner) Address() address.Address { @@ -213,12 +225,14 @@ func (m *Miner) mine(ctx context.Context) { onDone(b != nil, nil) if b != nil { - journal.Add("blockMined", map[string]interface{}{ - "parents": base.TipSet.Cids(), - "nulls": base.NullRounds, - "epoch": b.Header.Height, - "timestamp": b.Header.Timestamp, - "cid": b.Header.Cid(), + journal.MaybeRecordEvent(m.jrnl, m.evtTypes[evtTypeBlockMined], func() interface{} { + return map[string]interface{}{ + "parents": base.TipSet.Cids(), + "nulls": base.NullRounds, + "epoch": b.Header.Height, + "timestamp": b.Header.Timestamp, + "cid": b.Header.Cid(), + } }) btime := time.Unix(int64(b.Header.Timestamp), 0) diff --git a/node/builder.go b/node/builder.go index 86ef23192..580aec3df 100644 --- a/node/builder.go +++ b/node/builder.go @@ -156,13 +156,8 @@ func defaults() []Option { Override(new(record.Validator), modules.RecordValidator), Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)), Override(new(dtypes.ShutdownChan), make(chan struct{})), - Override(new(journal.Journal), journal.OpenFSJournal), - - // By default, disable noisy mpool events; keep only mpool:repub on. - Override(new(journal.DisabledEvents), journal.DisabledEvents{ - journal.EventType{System: "mpool", Event: "add"}, - journal.EventType{System: "mpool", Event: "remove"}, - }), + Override(new(journal.Journal), modules.OpenFilesystemJournal), + Override(new(journal.DisabledEvents), journal.DefaultDisabledEvents), Override(InitJournalKey, func(j journal.Journal) { /* forces the creation of the journal at startup */ }), // Filecoin modules diff --git a/node/modules/services.go b/node/modules/services.go index 0d148ffb4..def286913 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -1,6 +1,8 @@ package modules import ( + "context" + "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" eventbus "github.com/libp2p/go-eventbus" @@ -22,10 +24,12 @@ import ( "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/sub" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/node/repo" ) func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error { @@ -141,3 +145,16 @@ func RandomBeacon(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Random //return beacon.NewMockBeacon(build.BlockDelaySecs * time.Second) return drand.NewDrandBeacon(gen.Timestamp, build.BlockDelaySecs, p.PubSub, p.DrandConfig) } + +func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) { + jrnl, err := journal.OpenFSJournal(lr, disabled) + if err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { return jrnl.Close() }, + }) + + return jrnl, err +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index a06e44198..fc7458f8f 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -340,13 +340,13 @@ func StagingGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.Stagi return gs } -func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter) (*miner.Miner, error) { +func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter, jrnl journal.Journal) (*miner.Miner, error) { minerAddr, err := minerAddrFromDS(ds) if err != nil { return nil, err } - m := miner.NewMiner(api, epp, minerAddr, sf) + m := miner.NewMiner(api, epp, minerAddr, sf, jrnl) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error {