From 4e1ef09751ab8d937c16d7012a977cad93d3c492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 26 Aug 2020 16:09:37 +0100 Subject: [PATCH] make journal a global var. --- chain/gen/gen.go | 3 +- chain/gen/genesis/genesis.go | 3 +- chain/messagepool/messagepool.go | 15 +++-- chain/messagepool/messagepool_test.go | 18 +++--- chain/messagepool/selection_test.go | 11 ++-- chain/store/index_test.go | 12 ++-- chain/store/store.go | 8 +-- chain/store/store_test.go | 4 +- chain/validation/applier.go | 2 +- cmd/lotus-bench/import.go | 2 +- cmd/lotus-storage-miner/init.go | 7 ++- cmd/lotus/daemon.go | 7 +-- go.sum | 17 +++++ journal/fs.go | 28 ++++++--- journal/global.go | 9 +++ journal/nil.go | 2 +- journal/registry.go | 57 +++++++++++++++++ journal/types.go | 89 +++------------------------ markets/storageadapter/client.go | 15 ++--- markets/storageadapter/provider.go | 24 ++++---- miner/miner.go | 8 +-- node/builder.go | 10 ++- node/modules/chain.go | 9 ++- node/modules/storageminer.go | 14 ++--- storage/miner.go | 8 +-- storage/wdpost_run.go | 12 ++-- storage/wdpost_sched.go | 8 +-- 27 files changed, 202 insertions(+), 200 deletions(-) create mode 100644 journal/global.go create mode 100644 journal/registry.go diff --git a/chain/gen/gen.go b/chain/gen/gen.go index a58c34764..d4851a933 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -35,7 +35,6 @@ import ( "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/cmd/lotus-seed/seed" "github.com/filecoin-project/lotus/genesis" - "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/node/repo" @@ -221,7 +220,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) { return nil, xerrors.Errorf("make genesis block failed: %w", err) } - cs := store.NewChainStore(bs, ds, sys, journal.NilJournal()) + cs := store.NewChainStore(bs, ds, sys) genfb := &types.FullBlock{Header: genb.Genesis} gents := store.NewFullTipSet([]*types.FullBlock{genfb}) diff --git a/chain/gen/genesis/genesis.go b/chain/gen/genesis/genesis.go index 16f8d5307..1731d780a 100644 --- a/chain/gen/genesis/genesis.go +++ b/chain/gen/genesis/genesis.go @@ -27,7 +27,6 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/genesis" - "github.com/filecoin-project/lotus/journal" bstore "github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/lib/sigs" ) @@ -398,7 +397,7 @@ func MakeGenesisBlock(ctx context.Context, bs bstore.Blockstore, sys vm.SyscallB } // temp chainstore - cs := store.NewChainStore(bs, datastore.NewMapDatastore(), sys, journal.NilJournal()) + cs := store.NewChainStore(bs, datastore.NewMapDatastore(), sys) // Verify PreSealed Data stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 19856b9f7..ab23af856 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -234,7 +234,7 @@ func (mpp *mpoolProvider) ChainComputeBaseFee(ctx context.Context, ts *types.Tip return baseFee, nil } -func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, jrnl journal.Journal) (*MessagePool, error) { +func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) { cache, _ := lru.New2Q(build.BlsSignatureCacheSize) verifcache, _ := lru.New2Q(build.VerifSigCacheSize) @@ -263,11 +263,10 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, jrnl jo cfg: cfg, rbfNum: types.NewInt(uint64((cfg.ReplaceByFeeRatio - 1) * RbfDenom)), rbfDenom: types.NewInt(RbfDenom), - jrnl: jrnl, evtTypes: [...]journal.EventType{ - evtTypeMpoolAdd: jrnl.RegisterEventType("mpool", "add"), - evtTypeMpoolRemove: jrnl.RegisterEventType("mpool", "remove"), - evtTypeMpoolRepub: jrnl.RegisterEventType("mpool", "repub"), + evtTypeMpoolAdd: journal.J.RegisterEventType("mpool", "add"), + evtTypeMpoolRemove: journal.J.RegisterEventType("mpool", "remove"), + evtTypeMpoolRepub: journal.J.RegisterEventType("mpool", "repub"), }, } @@ -372,7 +371,7 @@ func (mp *MessagePool) runLoop() { } if len(outputMsgs) > 0 { - journal.MaybeRecordEvent(mp.jrnl, mp.evtTypes[evtTypeMpoolRepub], func() interface{} { + journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRepub], func() interface{} { msgs := make([]MessagePoolEvt_Message, 0, len(outputMsgs)) for _, m := range outputMsgs { msgs = append(msgs, MessagePoolEvt_Message{Message: m.Message, CID: m.Cid()}) @@ -584,7 +583,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error { Message: m, }, localUpdates) - journal.MaybeRecordEvent(mp.jrnl, mp.evtTypes[evtTypeMpoolAdd], func() interface{} { + journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolAdd], func() interface{} { return MessagePoolEvt{ Action: "add", Messages: []MessagePoolEvt_Message{{Message: m.Message, CID: m.Cid()}}, @@ -732,7 +731,7 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64) { Message: m, }, localUpdates) - journal.MaybeRecordEvent(mp.jrnl, mp.evtTypes[evtTypeMpoolRemove], func() interface{} { + journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRemove], func() interface{} { return MessagePoolEvt{ Action: "remove", Messages: []MessagePoolEvt_Message{{Message: m.Message, CID: m.Cid()}}} diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 06541d7e5..a16f61c48 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -6,17 +6,17 @@ import ( "testing" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/chain/types/mock" - "github.com/filecoin-project/lotus/chain/wallet" - "github.com/filecoin-project/lotus/journal" - _ "github.com/filecoin-project/lotus/lib/sigs/bls" - _ "github.com/filecoin-project/lotus/lib/sigs/secp" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/crypto" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/mock" + "github.com/filecoin-project/lotus/chain/wallet" + _ "github.com/filecoin-project/lotus/lib/sigs/bls" + _ "github.com/filecoin-project/lotus/lib/sigs/secp" ) func init() { @@ -174,7 +174,7 @@ func TestMessagePool(t *testing.T) { ds := datastore.NewMapDatastore() - mp, err := New(tma, ds, "mptest", journal.NilJournal()) + mp, err := New(tma, ds, "mptest") if err != nil { t.Fatal(err) } @@ -215,7 +215,7 @@ func TestRevertMessages(t *testing.T) { ds := datastore.NewMapDatastore() - mp, err := New(tma, ds, "mptest", journal.NilJournal()) + mp, err := New(tma, ds, "mptest") if err != nil { t.Fatal(err) } @@ -271,7 +271,7 @@ func TestPruningSimple(t *testing.T) { ds := datastore.NewMapDatastore() - mp, err := New(tma, ds, "mptest", journal.NilJournal()) + mp, err := New(tma, ds, "mptest") if err != nil { t.Fatal(err) } diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 91337d63c..af3f88807 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -5,16 +5,15 @@ import ( "testing" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/ipfs/go-datastore" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/messagepool/gasguess" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/mock" "github.com/filecoin-project/lotus/chain/wallet" - "github.com/filecoin-project/lotus/journal" - - "github.com/filecoin-project/specs-actors/actors/builtin" - "github.com/filecoin-project/specs-actors/actors/crypto" - "github.com/ipfs/go-datastore" _ "github.com/filecoin-project/lotus/lib/sigs/bls" _ "github.com/filecoin-project/lotus/lib/sigs/secp" @@ -44,7 +43,7 @@ func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, g func makeTestMpool() (*MessagePool, *testMpoolAPI) { tma := newTestMpoolAPI() ds := datastore.NewMapDatastore() - mp, err := New(tma, ds, "test", journal.NilJournal()) + mp, err := New(tma, ds, "test") if err != nil { panic(err) } diff --git a/chain/store/index_test.go b/chain/store/index_test.go index 2a0fdc2d1..38cad96dd 100644 --- a/chain/store/index_test.go +++ b/chain/store/index_test.go @@ -5,15 +5,15 @@ import ( "context" "testing" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/ipfs/go-datastore" + syncds "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/assert" + "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types/mock" - "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/blockstore" - "github.com/filecoin-project/specs-actors/actors/abi" - datastore "github.com/ipfs/go-datastore" - syncds "github.com/ipfs/go-datastore/sync" - "github.com/stretchr/testify/assert" ) func TestIndexSeeks(t *testing.T) { @@ -32,7 +32,7 @@ func TestIndexSeeks(t *testing.T) { ctx := context.TODO() nbs := blockstore.NewTemporarySync() - cs := store.NewChainStore(nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, journal.NilJournal()) + cs := store.NewChainStore(nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil) _, err = cs.Import(bytes.NewReader(gencar)) if err != nil { diff --git a/chain/store/store.go b/chain/store/store.go index 198f18756..053da258b 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -96,11 +96,10 @@ type ChainStore struct { vmcalls vm.SyscallBuilder - journal journal.Journal evtTypes [1]journal.EventType } -func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, jrnl journal.Journal) *ChainStore { +func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder) *ChainStore { c, _ := lru.NewARC(2048) tsc, _ := lru.NewARC(4096) cs := &ChainStore{ @@ -111,11 +110,10 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallB mmCache: c, tsCache: tsc, vmcalls: vmcalls, - journal: jrnl, } cs.evtTypes = [1]journal.EventType{ - evtTypeHeadChange: jrnl.RegisterEventType("sync", "head_change"), + evtTypeHeadChange: journal.J.RegisterEventType("sync", "head_change"), } ci := NewChainIndex(cs.LoadTipSet) @@ -344,7 +342,7 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo continue } - journal.MaybeRecordEvent(cs.journal, cs.evtTypes[evtTypeHeadChange], func() interface{} { + journal.J.RecordEvent(cs.evtTypes[evtTypeHeadChange], func() interface{} { return HeadChangeEvt{ From: r.old.Key(), FromHeight: r.old.Height(), diff --git a/chain/store/store_test.go b/chain/store/store_test.go index 56a78330a..e52a77570 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -68,7 +68,7 @@ func BenchmarkGetRandomness(b *testing.B) { bs := blockstore.NewBlockstore(bds) - cs := store.NewChainStore(bs, mds, nil, journal.NilJournal()) + cs := store.NewChainStore(bs, mds, nil) b.ResetTimer() @@ -102,7 +102,7 @@ func TestChainExportImport(t *testing.T) { } nbs := blockstore.NewTemporary() - cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil, journal.NilJournal()) + cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil) root, err := cs.Import(buf) if err != nil { diff --git a/chain/validation/applier.go b/chain/validation/applier.go index 9b688b8f0..c67a91c1a 100644 --- a/chain/validation/applier.go +++ b/chain/validation/applier.go @@ -69,7 +69,7 @@ func (a *Applier) ApplySignedMessage(epoch abi.ChainEpoch, msg *vtypes.SignedMes } func (a *Applier) ApplyTipSetMessages(epoch abi.ChainEpoch, blocks []vtypes.BlockMessagesInfo, rnd vstate.RandomnessSource) (vtypes.ApplyTipSetResult, error) { - cs := store.NewChainStore(a.stateWrapper.bs, a.stateWrapper.ds, a.syscalls, journal.NilJournal()) + cs := store.NewChainStore(a.stateWrapper.bs, a.stateWrapper.ds, a.syscalls) sm := stmgr.NewStateManager(cs) var bms []stmgr.BlockMessages diff --git a/cmd/lotus-bench/import.go b/cmd/lotus-bench/import.go index 03035880d..c7f687eb7 100644 --- a/cmd/lotus-bench/import.go +++ b/cmd/lotus-bench/import.go @@ -87,7 +87,7 @@ var importBenchCmd = &cli.Command{ } bs = cbs ds := datastore.NewMapDatastore() - cs := store.NewChainStore(bs, ds, vm.Syscalls(ffiwrapper.ProofVerifier), journal.NilJournal()) + cs := store.NewChainStore(bs, ds, vm.Syscalls(ffiwrapper.ProofVerifier)) stm := stmgr.NewStateManager(cs) prof, err := os.Create("import-bench.prof") diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 1714bd901..fc1e417ce 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -448,12 +448,13 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, return err } - jrnl, err := journal.OpenFSJournal(lr, journal.DefaultDisabledEvents) - if err != nil { + if jrnl, err := journal.OpenFSJournal(lr, journal.DefaultDisabledEvents); err == nil { + journal.J = jrnl + } else { return fmt.Errorf("failed to open filesystem journal: %w", err) } - m := miner.NewMiner(api, epp, a, slashfilter.New(mds), jrnl) + m := miner.NewMiner(api, epp, a, slashfilter.New(mds)) { if err := m.Start(ctx); err != nil { return xerrors.Errorf("failed to start up genesis miner: %w", err) diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index dc5168d69..52e8976c7 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -12,9 +12,6 @@ import ( "runtime/pprof" "strings" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/journal" - paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/mitchellh/go-homedir" "github.com/multiformats/go-multiaddr" @@ -25,6 +22,8 @@ import ( "go.opencensus.io/tag" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/stmgr" @@ -338,7 +337,7 @@ func ImportChain(r repo.Repo, fname string) error { bs := blockstore.NewBlockstore(ds) - cst := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), journal.NilJournal()) + cst := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier)) log.Info("importing chain from file...") ts, err := cst.Import(fi) diff --git a/go.sum b/go.sum index ca9f0df33..37703fd02 100644 --- a/go.sum +++ b/go.sum @@ -1230,6 +1230,7 @@ github.com/prometheus/procfs v0.1.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/raulk/clock v1.1.0 h1:dpb29+UKMbLqiU/jqIJptgLR1nn23HLgMY0sTCDza5Y= github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -1327,6 +1328,7 @@ github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFd github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/texttheater/golang-levenshtein v0.0.0-20180516184445-d188e65d659e/go.mod h1:XDKHRm5ThF8YJjx001LtgelzsoaEcvnA7lVWz9EeX3g= +github.com/tj/go-spin v1.1.0/go.mod h1:Mg1mzmePZm4dva8Qz60H2lHwmJ2loum4VIrLgVnKwh4= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-client-go v2.23.1+incompatible h1:uArBYHQR0HqLFFAypI7RsWTzPSj/bDpmZZuQjMLSg1A= @@ -1395,11 +1397,14 @@ github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1: github.com/whyrusleeping/yamux v1.1.5/go.mod h1:E8LnQQ8HKx5KD29HZFUwM1PxCOdPRzGwur1mcYhXcD8= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +github.com/xlab/c-for-go v0.0.0-20200718154222-87b0065af829/go.mod h1:h/1PEBwj7Ym/8kOuMWvO2ujZ6Lt+TMbySEXNhjjR87I= +github.com/xlab/pkgconfig v0.0.0-20170226114623-cea12a0fd245/go.mod h1:C+diUUz7pxhNY6KAoLgrTYARGWnt82zWTylZlxT92vk= github.com/xorcare/golden v0.6.0/go.mod h1:7T39/ZMvaSEZlBPoYfVFmsBLmUl3uz9IuzWj/U6FtvQ= github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 h1:oWgZJmC1DorFZDpfMfWg7xk29yEOZiXmo/wZl+utTI8= github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542/go.mod h1:7T39/ZMvaSEZlBPoYfVFmsBLmUl3uz9IuzWj/U6FtvQ= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.dedis.ch/fixbuf v1.0.3 h1:hGcV9Cd/znUxlusJ64eAlExS+5cJDIyTyEG+otu5wQs= go.dedis.ch/fixbuf v1.0.3/go.mod h1:yzJMt34Wa5xD37V5RTdmp38cz3QhMagdGoem9anUalw= go.dedis.ch/kyber/v3 v3.0.4/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ= @@ -1482,6 +1487,8 @@ golang.org/x/crypto v0.0.0-20200427165652-729f1e841bcc/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9 h1:vEg9joUBmeBcK9iSJftGNf3coIG4HqZElCPehJsfAYM= golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -1515,6 +1522,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0 h1:KU7oHjnv3XNWfa5COkzUifxZmxp1TyI7ImMXqFxLwvQ= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180524181706-dfa909b99c79/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1557,6 +1565,8 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200519113804-d87ec0cfa476/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200602114024-627f9648deb9 h1:pNX+40auqi2JqRfOP1akLGtYcn15TUbkhwuCO3foqqM= golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200625001655-4c5254603344 h1:vGXIOMxbNfDTk/aXCmfdLgkrSV+Z2tcbze+pEc3v5W4= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1695,6 +1705,7 @@ golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200318150045-ba25ddc85566/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4 h1:kDtqNkeBrZb8B+atrj50B5XLHpzXXqcCdZPP/ApQ5NY= golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= +golang.org/x/tools v0.0.0-20200711155855-7342f9734a7d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= @@ -1808,6 +1819,7 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= @@ -1825,6 +1837,11 @@ howett.net/plist v0.0.0-20181124034731-591f970eefbb h1:jhnBjNi9UFpfpl8YZhA9CrOqp howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0= launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54= launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM= +modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= +modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk= +modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= +modernc.org/xc v1.0.0/go.mod h1:mRNCo0bvLjGhHO9WsyuKVU4q0ceiDDDoEeWDJHrNx8I= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/journal/fs.go b/journal/fs.go index 73a2fff43..7d5203556 100644 --- a/journal/fs.go +++ b/journal/fs.go @@ -16,7 +16,7 @@ import ( // fsJournal is a basic journal backed by files on a filesystem. type fsJournal struct { - EventTypeFactory + EventTypeRegistry dir string sizeLimit int64 @@ -40,12 +40,12 @@ func OpenFSJournal(lr repo.LockedRepo, disabled DisabledEvents) (Journal, error) } f := &fsJournal{ - EventTypeFactory: NewEventTypeFactory(disabled), - dir: dir, - sizeLimit: 1 << 30, - incoming: make(chan *Event, 32), - closing: make(chan struct{}), - closed: make(chan struct{}), + EventTypeRegistry: NewEventTypeRegistry(disabled), + dir: dir, + sizeLimit: 1 << 30, + incoming: make(chan *Event, 32), + closing: make(chan struct{}), + closed: make(chan struct{}), } if err := f.rollJournalFile(); err != nil { @@ -57,11 +57,21 @@ func OpenFSJournal(lr repo.LockedRepo, disabled DisabledEvents) (Journal, error) return f, nil } -func (f *fsJournal) RecordEvent(evtType EventType, obj interface{}) { +func (f *fsJournal) RecordEvent(evtType EventType, supplier func() interface{}) { + defer func() { + if r := recover(); r != nil { + log.Warnf("recovered from panic while recording journal event; type=%s, err=%v", evtType, r) + } + }() + + if !evtType.Enabled() { + return + } + je := &Event{ EventType: evtType, Timestamp: build.Clock.Now(), - Data: obj, + Data: supplier(), } select { case f.incoming <- je: diff --git a/journal/global.go b/journal/global.go new file mode 100644 index 000000000..b4d0e0a1b --- /dev/null +++ b/journal/global.go @@ -0,0 +1,9 @@ +package journal + +var ( + // J is a globally accessible Journal. It starts being NilJournal, and early + // during the Lotus initialization routine, it is reset to whichever Journal + // is configured (by default, the filesystem journal). Components can safely + // record in the journal by calling: journal.J.RecordEvent(...). + J Journal = NilJournal() // nolint +) diff --git a/journal/nil.go b/journal/nil.go index 5d0c78b05..fa72fa373 100644 --- a/journal/nil.go +++ b/journal/nil.go @@ -11,6 +11,6 @@ func NilJournal() Journal { func (n *nilJournal) RegisterEventType(_, _ string) EventType { return EventType{} } -func (n *nilJournal) RecordEvent(_ EventType, _ interface{}) {} +func (n *nilJournal) RecordEvent(_ EventType, _ func() interface{}) {} func (n *nilJournal) Close() error { return nil } diff --git a/journal/registry.go b/journal/registry.go new file mode 100644 index 000000000..6ab5b5fb1 --- /dev/null +++ b/journal/registry.go @@ -0,0 +1,57 @@ +package journal + +import "sync" + +// EventTypeRegistry is a component that constructs tracked EventType tokens, +// for usage with a Journal. +type EventTypeRegistry interface { + + // RegisterEventType introduces a new event type to a journal, and + // returns an EventType token that components can later use to check whether + // journalling for that type is enabled/suppressed, and to tag journal + // entries appropriately. + RegisterEventType(system, event string) EventType +} + +// eventTypeRegistry is an embeddable mixin that takes care of tracking disabled +// event types, and returning initialized/safe EventTypes when requested. +type eventTypeRegistry struct { + sync.Mutex + + m map[string]EventType +} + +var _ EventTypeRegistry = (*eventTypeRegistry)(nil) + +func NewEventTypeRegistry(disabled DisabledEvents) EventTypeRegistry { + ret := &eventTypeRegistry{ + m: make(map[string]EventType, len(disabled)+32), // + extra capacity. + } + + for _, et := range disabled { + et.enabled, et.safe = false, true + ret.m[et.System+":"+et.Event] = et + } + + return ret +} + +func (d *eventTypeRegistry) RegisterEventType(system, event string) EventType { + d.Lock() + defer d.Unlock() + + key := system + ":" + event + if et, ok := d.m[key]; ok { + return et + } + + et := EventType{ + System: system, + Event: event, + enabled: true, + safe: true, + } + + d.m[key] = et + return et +} diff --git a/journal/types.go b/journal/types.go index 6183e6cfa..c81fb8c89 100644 --- a/journal/types.go +++ b/journal/types.go @@ -1,7 +1,6 @@ package journal import ( - "sync" "time" logging "github.com/ipfs/go-log" @@ -49,16 +48,6 @@ func (et EventType) Enabled() bool { return et.safe && et.enabled } -// EventTypeFactory is a component that constructs tracked EventType tokens, -// for usage with a Journal. -type EventTypeFactory interface { - // RegisterEventType introduces a new event type to a journal, and - // returns an EventType token that components can later use to check whether - // journalling for that type is enabled/suppressed, and to tag journal - // entries appropriately. - RegisterEventType(system, event string) EventType -} - // Journal represents an audit trail of system actions. // // Every entry is tagged with a timestamp, a system name, and an event name. @@ -68,11 +57,14 @@ type EventTypeFactory interface { // For cleanliness and type safety, we recommend to use typed events. See the // *Evt struct types in this package for more info. type Journal interface { - EventTypeFactory + EventTypeRegistry - // RecordEvent records this event to the journal. See godocs on the Journal type - // for more info. - RecordEvent(evtType EventType, data interface{}) + // RecordEvent records this event to the journal, if and only if the + // EventType is enabled. If so, it calls the supplier function to obtain + // the payload to record. + // + // Implementations MUST recover from panics raised by the supplier function. + RecordEvent(evtType EventType, supplier func() interface{}) // Close closes this journal for further writing. Close() error @@ -87,70 +79,3 @@ type Event struct { Timestamp time.Time Data interface{} } - -// MaybeRecordEvent is a convenience function that evaluates if the EventType is -// enabled, and if so, it calls the supplier to create the event and -// subsequently journal.RecordEvent on the provided journal to record it. -// -// It also recovers from panics raised when calling the supplier function. -// -// This is safe to call with a nil Journal, either because the value is nil, -// or because a journal obtained through NilJournal() is in use. -func MaybeRecordEvent(journal Journal, evtType EventType, supplier func() interface{}) { - defer func() { - if r := recover(); r != nil { - log.Warnf("recovered from panic while recording journal event; type=%s, err=%v", evtType, r) - } - }() - - if journal == nil || journal == nilj { - return - } - if !evtType.Enabled() { - return - } - journal.RecordEvent(evtType, supplier()) -} - -// eventTypeFactory is an embeddable mixin that takes care of tracking disabled -// event types, and returning initialized/safe EventTypes when requested. -type eventTypeFactory struct { - sync.Mutex - - m map[string]EventType -} - -var _ EventTypeFactory = (*eventTypeFactory)(nil) - -func NewEventTypeFactory(disabled DisabledEvents) EventTypeFactory { - ret := &eventTypeFactory{ - m: make(map[string]EventType, len(disabled)+32), // + extra capacity. - } - - for _, et := range disabled { - et.enabled, et.safe = false, true - ret.m[et.System+":"+et.Event] = et - } - - return ret -} - -func (d *eventTypeFactory) RegisterEventType(system, event string) EventType { - d.Lock() - defer d.Unlock() - - key := system + ":" + event - if et, ok := d.m[key]; ok { - return et - } - - et := EventType{ - System: system, - Event: event, - enabled: true, - safe: true, - } - - d.m[key] = et - return et -} diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index 9186b94c0..761a4aa74 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -43,8 +43,6 @@ type ClientNodeAdapter struct { fm *market.FundMgr ev *events.Events - jrnl journal.Journal - // index 0 is unused, as it corresponds to evtTypeDealAccepted, a provider-only event. evtTypes [4]journal.EventType } @@ -54,7 +52,7 @@ type clientApi struct { full.StateAPI } -func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, sm *stmgr.StateManager, cs *store.ChainStore, fm *market.FundMgr, jrnl journal.Journal) storagemarket.StorageClientNode { +func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, sm *stmgr.StateManager, cs *store.ChainStore, fm *market.FundMgr) storagemarket.StorageClientNode { return &ClientNodeAdapter{ StateAPI: state, ChainAPI: chain, @@ -65,11 +63,10 @@ func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.M fm: fm, ev: events.NewEvents(context.TODO(), &clientApi{chain, state}), - jrnl: jrnl, evtTypes: [...]journal.EventType{ - evtTypeDealSectorCommitted: jrnl.RegisterEventType("markets:storage:client", "deal_sector_committed"), - evtTypeDealExpired: jrnl.RegisterEventType("markets:storage:client", "deal_expired"), - evtTypeDealSlashed: jrnl.RegisterEventType("markets:storage:client", "deal_slashed"), + evtTypeDealSectorCommitted: journal.J.RegisterEventType("markets:storage:client", "deal_sector_committed"), + evtTypeDealExpired: journal.J.RegisterEventType("markets:storage:client", "deal_expired"), + evtTypeDealSlashed: journal.J.RegisterEventType("markets:storage:client", "deal_slashed"), }, } } @@ -236,7 +233,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor } dealID := res.IDs[dealIdx] - journal.MaybeRecordEvent(c.jrnl, c.evtTypes[evtTypeDealAccepted], func() interface{} { + journal.J.RecordEvent(c.evtTypes[evtTypeDealAccepted], func() interface{} { deal := deal // copy and strip fields we don't want to log to the journal deal.ClientSignature = crypto.Signature{} return ClientDealAcceptedEvt{ID: dealID, Deal: deal, Height: c.cs.GetHeaviestTipSet().Height()} @@ -294,7 +291,7 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider log.Infof("Storage deal %d activated at epoch %d", dealId, sd.State.SectorStartEpoch) - journal.MaybeRecordEvent(c.jrnl, c.evtTypes[evtTypeDealSectorCommitted], func() interface{} { + journal.J.RecordEvent(c.evtTypes[evtTypeDealSectorCommitted], func() interface{} { return ClientDealSectorCommittedEvt{ID: dealId, State: sd.State, Height: curH} }) diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 8e31939df..d772a36a8 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -47,23 +47,21 @@ type ProviderNodeAdapter struct { secb *sectorblocks.SectorBlocks ev *events.Events - jrnl journal.Journal evtTypes [4]journal.EventType } -func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, jrnl journal.Journal) storagemarket.StorageProviderNode { +func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { return &ProviderNodeAdapter{ FullNode: full, dag: dag, secb: secb, ev: events.NewEvents(context.TODO(), full), - jrnl: jrnl, evtTypes: [...]journal.EventType{ - evtTypeDealAccepted: jrnl.RegisterEventType("markets:storage:provider", "deal_complete"), - evtTypeDealSectorCommitted: jrnl.RegisterEventType("markets:storage:provider", "deal_sector_committed"), - evtTypeDealExpired: jrnl.RegisterEventType("markets:storage:provider", "deal_expired"), - evtTypeDealSlashed: jrnl.RegisterEventType("markets:storage:provider", "deal_slashed"), + evtTypeDealAccepted: journal.J.RegisterEventType("markets:storage:provider", "deal_complete"), + evtTypeDealSectorCommitted: journal.J.RegisterEventType("markets:storage:provider", "deal_sector_committed"), + evtTypeDealExpired: journal.J.RegisterEventType("markets:storage:provider", "deal_expired"), + evtTypeDealSlashed: journal.J.RegisterEventType("markets:storage:provider", "deal_slashed"), }, } } @@ -113,7 +111,7 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema log.Warnf("New Deal: deal %d", deal.DealID) - journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealAccepted], func() interface{} { + journal.J.RecordEvent(n.evtTypes[evtTypeDealAccepted], func() interface{} { deal := deal // copy and strip fields we don't want to log to the journal deal.ClientSignature = crypto.Signature{} return MinerDealAcceptedEvt{ID: deal.DealID, Deal: deal} @@ -297,7 +295,7 @@ func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provide log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch) - journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealSectorCommitted], func() interface{} { + journal.J.RecordEvent(n.evtTypes[evtTypeDealSectorCommitted], func() interface{} { return MinerDealSectorCommittedEvt{ID: dealID, State: sd.State, Height: curH} }) @@ -407,7 +405,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // Check if the deal has already expired if sd.Proposal.EndEpoch <= height { onDealExpired(nil) - journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} { + journal.J.RecordEvent(n.evtTypes[evtTypeDealExpired], func() interface{} { return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height} }) return true, false, nil @@ -416,7 +414,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // If there is no deal assume it's already been slashed if sd.State.SectorStartEpoch < 0 { onDealSlashed(height, nil) - journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} { + journal.J.RecordEvent(n.evtTypes[evtTypeDealSlashed], func() interface{} { return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height} }) return true, false, nil @@ -435,7 +433,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // Check if the deal has already expired if sd.Proposal.EndEpoch <= height { onDealExpired(nil) - journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} { + journal.J.RecordEvent(n.evtTypes[evtTypeDealExpired], func() interface{} { return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height} }) return false, nil @@ -461,7 +459,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // Deal was slashed if deal.To == nil { onDealSlashed(height, nil) - journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} { + journal.J.RecordEvent(n.evtTypes[evtTypeDealSlashed], func() interface{} { return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height} }) return false, nil diff --git a/miner/miner.go b/miner/miner.go index 091919b6c..bf1874f53 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -47,7 +47,7 @@ func randTimeOffset(width time.Duration) time.Duration { return val - (width / 2) } -func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf *slashfilter.SlashFilter, jrnl journal.Journal) *Miner { +func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf *slashfilter.SlashFilter) *Miner { arc, err := lru.NewARC(10000) if err != nil { panic(err) @@ -71,9 +71,8 @@ func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf: sf, minedBlockHeights: arc, - jrnl: jrnl, evtTypes: [...]journal.EventType{ - evtTypeBlockMined: jrnl.RegisterEventType("miner", "block_mined"), + evtTypeBlockMined: journal.J.RegisterEventType("miner", "block_mined"), }, } } @@ -95,7 +94,6 @@ type Miner struct { sf *slashfilter.SlashFilter minedBlockHeights *lru.ARCCache - jrnl journal.Journal evtTypes [1]journal.EventType } @@ -225,7 +223,7 @@ func (m *Miner) mine(ctx context.Context) { onDone(b != nil, nil) if b != nil { - journal.MaybeRecordEvent(m.jrnl, m.evtTypes[evtTypeBlockMined], func() interface{} { + journal.J.RecordEvent(m.evtTypes[evtTypeBlockMined], func() interface{} { return map[string]interface{}{ "parents": base.TipSet.Cids(), "nulls": base.NullRounds, diff --git a/node/builder.go b/node/builder.go index 580aec3df..000099a43 100644 --- a/node/builder.go +++ b/node/builder.go @@ -152,14 +152,18 @@ type Settings struct { func defaults() []Option { return []Option{ + // global system journal. + Override(new(journal.DisabledEvents), journal.DefaultDisabledEvents), + Override(new(journal.Journal), modules.OpenFilesystemJournal), + Override(InitJournalKey, func(j journal.Journal) { + journal.J = j // eagerly sets the global journal through fx.Invoke. + }), + Override(new(helpers.MetricsCtx), context.Background), Override(new(record.Validator), modules.RecordValidator), Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)), Override(new(dtypes.ShutdownChan), make(chan struct{})), - Override(new(journal.Journal), modules.OpenFilesystemJournal), - Override(new(journal.DisabledEvents), journal.DefaultDisabledEvents), - Override(InitJournalKey, func(j journal.Journal) { /* forces the creation of the journal at startup */ }), // Filecoin modules } diff --git a/node/modules/chain.go b/node/modules/chain.go index 7ad4786ca..904c9f23b 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -26,7 +26,6 @@ import ( "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" - "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" @@ -48,9 +47,9 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt return exch } -func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, jrnl journal.Journal) (*messagepool.MessagePool, error) { +func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName) (*messagepool.MessagePool, error) { mpp := messagepool.NewProvider(sm, ps) - mp, err := messagepool.New(mpp, ds, nn, jrnl) + mp, err := messagepool.New(mpp, ds, nn) if err != nil { return nil, xerrors.Errorf("constructing mpool: %w", err) } @@ -85,8 +84,8 @@ func ChainBlockservice(bs dtypes.ChainBlockstore, rem dtypes.ChainExchange) dtyp return blockservice.New(bs, rem) } -func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, journal journal.Journal) *store.ChainStore { - chain := store.NewChainStore(bs, ds, syscalls, journal) +func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder) *store.ChainStore { + chain := store.NewChainStore(bs, ds, syscalls) if err := chain.Load(); err != nil { log.Warnf("loading chain state from disk: %s", err) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index fc7458f8f..85ffeb9cc 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -44,11 +44,11 @@ import ( paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-storedcounter" - "github.com/filecoin-project/lotus/journal" + "github.com/filecoin-project/specs-actors/actors/abi" + sectorstorage "github.com/filecoin-project/sector-storage" "github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/sector-storage/stores" - "github.com/filecoin-project/specs-actors/actors/abi" sealing "github.com/filecoin-project/storage-fsm" lapi "github.com/filecoin-project/lotus/api" @@ -155,7 +155,6 @@ type StorageMinerParams struct { SectorIDCounter sealing.SectorIDCounter Verifier ffiwrapper.Verifier GetSealingDelayFn dtypes.GetSealingDelayFunc - Journal journal.Journal } func StorageMiner(params StorageMinerParams) (*storage.Miner, error) { @@ -169,7 +168,6 @@ func StorageMiner(params StorageMinerParams) (*storage.Miner, error) { sc = params.SectorIDCounter verif = params.Verifier gsd = params.GetSealingDelayFn - jrnl = params.Journal ) maddr, err := minerAddrFromDS(ds) if err != nil { @@ -188,12 +186,12 @@ func StorageMiner(params StorageMinerParams) (*storage.Miner, error) { return nil, err } - fps, err := storage.NewWindowedPoStScheduler(api, sealer, sealer, maddr, worker, jrnl) + fps, err := storage.NewWindowedPoStScheduler(api, sealer, sealer, maddr, worker) if err != nil { return nil, err } - sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd, jrnl) + sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd) if err != nil { return nil, err } @@ -340,13 +338,13 @@ func StagingGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.Stagi return gs } -func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter, jrnl journal.Journal) (*miner.Miner, error) { +func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter) (*miner.Miner, error) { minerAddr, err := minerAddrFromDS(ds) if err != nil { return nil, err } - m := miner.NewMiner(api, epp, minerAddr, sf, jrnl) + m := miner.NewMiner(api, epp, minerAddr, sf) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { diff --git a/storage/miner.go b/storage/miner.go index 2c37e8c4a..b09ee2fc4 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -45,7 +45,6 @@ type Miner struct { getSealDelay dtypes.GetSealingDelayFunc sealing *sealing.Sealing - jrnl journal.Journal sealingEvtType journal.EventType } @@ -94,7 +93,7 @@ type storageMinerApi interface { WalletHas(context.Context, address.Address) (bool, error) } -func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc, jrnl journal.Journal) (*Miner, error) { +func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*Miner, error) { m := &Miner{ api: api, h: h, @@ -107,8 +106,7 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d worker: worker, getSealDelay: gsd, - jrnl: jrnl, - sealingEvtType: jrnl.RegisterEventType("storage", "sealing_states"), + sealingEvtType: journal.J.RegisterEventType("storage", "sealing_states"), } return m, nil @@ -135,7 +133,7 @@ func (m *Miner) Run(ctx context.Context) error { } func (m *Miner) handleSealingNotifications(before, after sealing.SectorInfo) { - journal.MaybeRecordEvent(m.jrnl, m.sealingEvtType, func() interface{} { + journal.J.RecordEvent(m.sealingEvtType, func() interface{} { return SealingStateEvt{ SectorNumber: before.SectorNumber, SectorType: before.SectorType, diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index c448fb223..7fff0b916 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -26,7 +26,7 @@ import ( var errNoPartitions = errors.New("no partitions") func (s *WindowPoStScheduler) failPost(err error, deadline *miner.DeadlineInfo) { - journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { + journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { return s.enrichWithTipset(WindowPoStEvt{ State: "failed", Deadline: s.activeDeadline, @@ -48,7 +48,7 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli s.abort = abort s.activeDeadline = deadline - journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { + journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { return s.enrichWithTipset(WindowPoStEvt{ State: "started", Deadline: s.activeDeadline, @@ -64,7 +64,7 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli // recordEvent records a successful proofs_processed event in the // journal, even if it was a noop (no partitions). recordEvent := func(partitions []miner.PoStPartition, mcid cid.Cid) { - journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { + journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { return s.enrichWithTipset(WindowPoStEvt{ State: "proofs_processed", Deadline: s.activeDeadline, @@ -95,7 +95,7 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli return } - journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { + journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { return s.enrichWithTipset(WindowPoStEvt{ State: "succeeded", Deadline: s.activeDeadline, @@ -159,7 +159,7 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin Recoveries: []miner.RecoveryDeclaration{}, } - defer journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { + defer journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { var mcid cid.Cid if sm != nil { mcid = sm.Cid() @@ -265,7 +265,7 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, Faults: []miner.FaultDeclaration{}, } - defer journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { + defer journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { var mcid cid.Cid if sm != nil { mcid = sm.Cid() diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 2c55b93ef..c73d320f3 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -66,14 +66,13 @@ type WindowPoStScheduler struct { activeDeadline *miner.DeadlineInfo abort context.CancelFunc - jrnl journal.Journal wdPoStEvtType journal.EventType // failed abi.ChainEpoch // eps // failLk sync.Mutex } -func NewWindowedPoStScheduler(api storageMinerApi, sb storage.Prover, ft sectorstorage.FaultTracker, actor address.Address, worker address.Address, jrnl journal.Journal) (*WindowPoStScheduler, error) { +func NewWindowedPoStScheduler(api storageMinerApi, sb storage.Prover, ft sectorstorage.FaultTracker, actor address.Address, worker address.Address) (*WindowPoStScheduler, error) { mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("getting sector size: %w", err) @@ -93,8 +92,7 @@ func NewWindowedPoStScheduler(api storageMinerApi, sb storage.Prover, ft sectors actor: actor, worker: worker, - jrnl: jrnl, - wdPoStEvtType: jrnl.RegisterEventType("storage", "wdpost"), + wdPoStEvtType: journal.J.RegisterEventType("storage", "wdpost"), }, nil } @@ -248,7 +246,7 @@ func (s *WindowPoStScheduler) abortActivePoSt() { if s.abort != nil { s.abort() - journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { + journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { return s.enrichWithTipset(WindowPoStEvt{ State: "abort", Deadline: s.activeDeadline,