unshare the journal

Motivation:

* Run lotus with the race detector enabled (primary motivation).
* Allow multiple lotus nodes in a process (not a high priority).

Previously, the journal was shared between all lotus instances, but it was
initialized for every new node. This caused safety problems in tests (at a
minimum).

This patch explicitly passes the journal to all services that need it.
This commit is contained in:
Steven Allen 2020-10-09 12:52:04 -07:00
parent a5c78371fd
commit 748d2e82a7
32 changed files with 148 additions and 120 deletions

View File

@ -39,6 +39,7 @@ import (
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/node/repo"
@ -122,6 +123,7 @@ var DefaultRemainderAccountActor = genesis.Actor{
}
func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
j := journal.NilJournal()
// TODO: we really shouldn't modify a global variable here.
policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1)
@ -229,12 +231,12 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
Timestamp: uint64(build.Clock.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()),
}
genb, err := genesis2.MakeGenesisBlock(context.TODO(), bs, sys, tpl)
genb, err := genesis2.MakeGenesisBlock(context.TODO(), j, bs, sys, tpl)
if err != nil {
return nil, xerrors.Errorf("make genesis block failed: %w", err)
}
cs := store.NewChainStore(bs, ds, sys)
cs := store.NewChainStore(bs, ds, sys, j)
genfb := &types.FullBlock{Header: genb.Genesis}
gents := store.NewFullTipSet([]*types.FullBlock{genfb})

View File

@ -7,6 +7,7 @@ import (
"fmt"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/journal"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
@ -466,7 +467,10 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot ci
return st, nil
}
func MakeGenesisBlock(ctx context.Context, bs bstore.Blockstore, sys vm.SyscallBuilder, template genesis.Template) (*GenesisBootstrap, error) {
func MakeGenesisBlock(ctx context.Context, j journal.Journal, bs bstore.Blockstore, sys vm.SyscallBuilder, template genesis.Template) (*GenesisBootstrap, error) {
if j == nil {
j = journal.NilJournal()
}
st, keyIDs, err := MakeInitialStateTree(ctx, bs, template)
if err != nil {
return nil, xerrors.Errorf("make initial state tree failed: %w", err)
@ -478,7 +482,7 @@ func MakeGenesisBlock(ctx context.Context, bs bstore.Blockstore, sys vm.SyscallB
}
// temp chainstore
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), sys)
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), sys, j)
// Verify PreSealed Data
stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs)

View File

@ -159,6 +159,7 @@ type MessagePool struct {
sigValCache *lru.TwoQueueCache
evtTypes [3]journal.EventType
journal journal.Journal
}
type msgSet struct {
@ -316,7 +317,7 @@ func (ms *msgSet) getRequiredFunds(nonce uint64) types.BigInt {
return types.BigInt{Int: requiredFunds}
}
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) {
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
@ -325,6 +326,10 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
return nil, xerrors.Errorf("error loading mpool config: %w", err)
}
if j == nil {
j = journal.NilJournal()
}
mp := &MessagePool{
ds: ds,
addSema: make(chan struct{}, 1),
@ -344,10 +349,11 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
netName: netName,
cfg: cfg,
evtTypes: [...]journal.EventType{
evtTypeMpoolAdd: journal.J.RegisterEventType("mpool", "add"),
evtTypeMpoolRemove: journal.J.RegisterEventType("mpool", "remove"),
evtTypeMpoolRepub: journal.J.RegisterEventType("mpool", "repub"),
evtTypeMpoolAdd: j.RegisterEventType("mpool", "add"),
evtTypeMpoolRemove: j.RegisterEventType("mpool", "remove"),
evtTypeMpoolRepub: j.RegisterEventType("mpool", "repub"),
},
journal: j,
}
// enable initial prunes
@ -744,7 +750,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool)
Message: m,
}, localUpdates)
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolAdd], func() interface{} {
mp.journal.RecordEvent(mp.evtTypes[evtTypeMpoolAdd], func() interface{} {
return MessagePoolEvt{
Action: "add",
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}},
@ -865,7 +871,7 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool)
Message: m,
}, localUpdates)
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRemove], func() interface{} {
mp.journal.RecordEvent(mp.evtTypes[evtTypeMpoolRemove], func() interface{} {
return MessagePoolEvt{
Action: "remove",
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}}}

View File

@ -225,7 +225,7 @@ func TestMessagePool(t *testing.T) {
ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "mptest")
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -266,7 +266,7 @@ func TestMessagePoolMessagesInEachBlock(t *testing.T) {
ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "mptest")
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -315,7 +315,7 @@ func TestRevertMessages(t *testing.T) {
ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "mptest")
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -378,7 +378,7 @@ func TestPruningSimple(t *testing.T) {
ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "mptest")
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -422,7 +422,7 @@ func TestLoadLocal(t *testing.T) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "mptest")
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -465,7 +465,7 @@ func TestLoadLocal(t *testing.T) {
t.Fatal(err)
}
mp, err = New(tma, ds, "mptest")
mp, err = New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -494,7 +494,7 @@ func TestClearAll(t *testing.T) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "mptest")
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -548,7 +548,7 @@ func TestClearNonLocal(t *testing.T) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "mptest")
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -609,7 +609,7 @@ func TestUpdates(t *testing.T) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "mptest")
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}

View File

@ -11,7 +11,6 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/ipfs/go-cid"
)
@ -148,7 +147,7 @@ loop:
}
if len(msgs) > 0 {
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
mp.journal.RecordEvent(mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
msgsEv := make([]MessagePoolEvtMessage, 0, len(msgs))
for _, m := range msgs {
msgsEv = append(msgsEv, MessagePoolEvtMessage{Message: m.Message, CID: m.Cid()})

View File

@ -21,7 +21,7 @@ func TestRepubMessages(t *testing.T) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "mptest")
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}

View File

@ -58,7 +58,7 @@ func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, g
func makeTestMpool() (*MessagePool, *testMpoolAPI) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "test")
mp, err := New(tma, ds, "test", nil)
if err != nil {
panic(err)
}

View File

@ -31,7 +31,7 @@ func TestIndexSeeks(t *testing.T) {
ctx := context.TODO()
nbs := blockstore.NewTemporarySync()
cs := store.NewChainStore(nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil)
cs := store.NewChainStore(nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
_, err = cs.Import(bytes.NewReader(gencar))
if err != nil {

View File

@ -124,11 +124,15 @@ type ChainStore struct {
vmcalls vm.SyscallBuilder
evtTypes [1]journal.EventType
journal journal.Journal
}
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder) *ChainStore {
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
if j == nil {
j = journal.NilJournal()
}
cs := &ChainStore{
bs: bs,
ds: ds,
@ -137,10 +141,11 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallB
mmCache: c,
tsCache: tsc,
vmcalls: vmcalls,
journal: j,
}
cs.evtTypes = [1]journal.EventType{
evtTypeHeadChange: journal.J.RegisterEventType("sync", "head_change"),
evtTypeHeadChange: j.RegisterEventType("sync", "head_change"),
}
ci := NewChainIndex(cs.LoadTipSet)
@ -379,7 +384,7 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
continue
}
journal.J.RecordEvent(cs.evtTypes[evtTypeHeadChange], func() interface{} {
cs.journal.RecordEvent(cs.evtTypes[evtTypeHeadChange], func() interface{} {
return HeadChangeEvt{
From: r.old.Key(),
FromHeight: r.old.Height(),

View File

@ -62,7 +62,7 @@ func BenchmarkGetRandomness(b *testing.B) {
bs := blockstore.NewBlockstore(bds)
cs := store.NewChainStore(bs, mds, nil)
cs := store.NewChainStore(bs, mds, nil, nil)
b.ResetTimer()
@ -96,7 +96,7 @@ func TestChainExportImport(t *testing.T) {
}
nbs := blockstore.NewTemporary()
cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil)
cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil, nil)
root, err := cs.Import(buf)
if err != nil {

View File

@ -183,7 +183,7 @@ var importBenchCmd = &cli.Command{
return nil
}
cs := store.NewChainStore(bs, ds, vm.Syscalls(verifier))
cs := store.NewChainStore(bs, ds, vm.Syscalls(verifier), nil)
stm := stmgr.NewStateManager(cs)
if cctx.Bool("global-profile") {

View File

@ -173,7 +173,7 @@ var chainBalanceStateCmd = &cli.Command{
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
@ -343,7 +343,7 @@ var chainPledgeCmd = &cli.Command{
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)

View File

@ -83,7 +83,7 @@ var exportChainCmd = &cli.Command{
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, nil)
cs := store.NewChainStore(bs, mds, nil, nil)
if err := cs.Load(); err != nil {
return err
}

View File

@ -52,7 +52,7 @@ var genesisVerifyCmd = &cli.Command{
}
bs := blockstore.NewBlockstore(datastore.NewMapDatastore())
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), nil)
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), nil, nil)
cf := cctx.Args().Get(0)
f, err := os.Open(cf)

View File

@ -162,7 +162,7 @@ var stateTreePruneCmd = &cli.Command{
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
if err := cs.Load(); err != nil {
return fmt.Errorf("loading chainstore: %w", err)
}

View File

@ -464,13 +464,12 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
return err
}
if jrnl, err := journal.OpenFSJournal(lr, journal.DefaultDisabledEvents); err == nil {
journal.J = jrnl
} else {
j, err := journal.OpenFSJournal(lr, journal.EnvDisabledEvents())
if err != nil {
return fmt.Errorf("failed to open filesystem journal: %w", err)
}
m := miner.NewMiner(api, epp, a, slashfilter.New(mds))
m := miner.NewMiner(api, epp, a, slashfilter.New(mds), j)
{
if err := m.Start(ctx); err != nil {
return xerrors.Errorf("failed to start up genesis miner: %w", err)

View File

@ -34,6 +34,7 @@ import (
"github.com/filecoin-project/lotus/chain/vm"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/lib/peermgr"
"github.com/filecoin-project/lotus/lib/ulimit"
@ -410,7 +411,11 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
bs := blockstore.NewBlockstore(ds)
cst := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
j, err := journal.OpenFSJournal(lr, journal.EnvDisabledEvents())
if err != nil {
return xerrors.Errorf("failed to open journal: %w", err)
}
cst := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
log.Infof("importing chain from %s...", fname)

View File

@ -87,7 +87,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, preroot
syscalls = vm.Syscalls(ffiwrapper.ProofVerifier)
vmRand = NewFixedRand()
cs = store.NewChainStore(bs, ds, syscalls)
cs = store.NewChainStore(bs, ds, syscalls, nil)
sm = stmgr.NewStateManager(cs)
)

19
journal/env.go Normal file
View File

@ -0,0 +1,19 @@
package journal
import (
"os"
)
// envJournalDisabledEvents is the environment variable through which disabled
// journal events can be customized.
const envDisabledEvents = "LOTUS_JOURNAL_DISABLED_EVENTS"
func EnvDisabledEvents() DisabledEvents {
if env, ok := os.LookupEnv(envDisabledEvents); ok {
if ret, err := ParseDisabledEvents(env); err == nil {
return ret
}
}
// fallback if env variable is not set, or if it failed to parse.
return DefaultDisabledEvents
}

View File

@ -1,9 +0,0 @@
package journal
var (
// J is a globally accessible Journal. It starts being NilJournal, and early
// during the Lotus initialization routine, it is reset to whichever Journal
// is configured (by default, the filesystem journal). Components can safely
// record in the journal by calling: journal.J.RecordEvent(...).
J Journal = NilJournal() // nolint
)

View File

@ -28,9 +28,9 @@ type RetrievalProviderEvt struct {
}
// StorageClientJournaler records journal events from the storage client.
func StorageClientJournaler(evtType journal.EventType) func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
func StorageClientJournaler(j journal.Journal, evtType journal.EventType) func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
return func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
journal.J.RecordEvent(evtType, func() interface{} {
j.RecordEvent(evtType, func() interface{} {
return StorageClientEvt{
Event: storagemarket.ClientEvents[event],
Deal: deal,
@ -40,9 +40,9 @@ func StorageClientJournaler(evtType journal.EventType) func(event storagemarket.
}
// StorageProviderJournaler records journal events from the storage provider.
func StorageProviderJournaler(evtType journal.EventType) func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
func StorageProviderJournaler(j journal.Journal, evtType journal.EventType) func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
return func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
journal.J.RecordEvent(evtType, func() interface{} {
j.RecordEvent(evtType, func() interface{} {
return StorageProviderEvt{
Event: storagemarket.ProviderEvents[event],
Deal: deal,
@ -52,9 +52,9 @@ func StorageProviderJournaler(evtType journal.EventType) func(event storagemarke
}
// RetrievalClientJournaler records journal events from the retrieval client.
func RetrievalClientJournaler(evtType journal.EventType) func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
func RetrievalClientJournaler(j journal.Journal, evtType journal.EventType) func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
return func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
journal.J.RecordEvent(evtType, func() interface{} {
j.RecordEvent(evtType, func() interface{} {
return RetrievalClientEvt{
Event: retrievalmarket.ClientEvents[event],
Deal: deal,
@ -64,9 +64,9 @@ func RetrievalClientJournaler(evtType journal.EventType) func(event retrievalmar
}
// RetrievalProviderJournaler records journal events from the retrieval provider.
func RetrievalProviderJournaler(evtType journal.EventType) func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
func RetrievalProviderJournaler(j journal.Journal, evtType journal.EventType) func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
return func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
journal.J.RecordEvent(evtType, func() interface{} {
j.RecordEvent(evtType, func() interface{} {
return RetrievalProviderEvt{
Event: retrievalmarket.ProviderEvents[event],
Deal: deal,

View File

@ -49,7 +49,7 @@ func randTimeOffset(width time.Duration) time.Duration {
return val - (width / 2)
}
func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf *slashfilter.SlashFilter) *Miner {
func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf *slashfilter.SlashFilter, j journal.Journal) *Miner {
arc, err := lru.NewARC(10000)
if err != nil {
panic(err)
@ -74,8 +74,9 @@ func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address,
sf: sf,
minedBlockHeights: arc,
evtTypes: [...]journal.EventType{
evtTypeBlockMined: journal.J.RegisterEventType("miner", "block_mined"),
evtTypeBlockMined: j.RegisterEventType("miner", "block_mined"),
},
journal: j,
}
}
@ -97,6 +98,7 @@ type Miner struct {
minedBlockHeights *lru.ARCCache
evtTypes [1]journal.EventType
journal journal.Journal
}
func (m *Miner) Address() address.Address {
@ -239,7 +241,7 @@ minerLoop:
onDone(b != nil, h, nil)
if b != nil {
journal.J.RecordEvent(m.evtTypes[evtTypeBlockMined], func() interface{} {
m.journal.RecordEvent(m.evtTypes[evtTypeBlockMined], func() interface{} {
return map[string]interface{}{
"parents": base.TipSet.Cids(),
"nulls": base.NullRounds,

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/journal"
)
type MineReq struct {
@ -32,6 +33,7 @@ func NewTestMiner(nextCh <-chan MineReq, addr address.Address) func(api.FullNode
minedBlockHeights: arc,
address: addr,
sf: slashfilter.New(ds.NewMapDatastore()),
journal: journal.NilJournal(),
}
if err := m.Start(context.TODO()); err != nil {

View File

@ -3,7 +3,6 @@ package node
import (
"context"
"errors"
"os"
"time"
"github.com/filecoin-project/lotus/chain"
@ -73,10 +72,6 @@ import (
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
// EnvJournalDisabledEvents is the environment variable through which disabled
// journal events can be customized.
const EnvJournalDisabledEvents = "LOTUS_JOURNAL_DISABLED_EVENTS"
//nolint:deadcode,varcheck
var log = logging.Logger("builder")
@ -166,19 +161,8 @@ type Settings struct {
func defaults() []Option {
return []Option{
// global system journal.
Override(new(journal.DisabledEvents), func() journal.DisabledEvents {
if env, ok := os.LookupEnv(EnvJournalDisabledEvents); ok {
if ret, err := journal.ParseDisabledEvents(env); err == nil {
return ret
}
}
// fallback if env variable is not set, or if it failed to parse.
return journal.DefaultDisabledEvents
}),
Override(new(journal.DisabledEvents), journal.EnvDisabledEvents),
Override(new(journal.Journal), modules.OpenFilesystemJournal),
Override(InitJournalKey, func(j journal.Journal) {
journal.J = j // eagerly sets the global journal through fx.Invoke.
}),
Override(new(helpers.MetricsCtx), context.Background),
Override(new(record.Validator), modules.RecordValidator),

View File

@ -17,6 +17,7 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
@ -51,9 +52,9 @@ func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt r
return exch
}
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName) (*messagepool.MessagePool, error) {
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
mpp := messagepool.NewProvider(sm, ps)
mp, err := messagepool.New(mpp, ds, nn)
mp, err := messagepool.New(mpp, ds, nn, j)
if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err)
}
@ -88,8 +89,8 @@ func ChainBlockService(bs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) dtype
return blockservice.New(bs, rem)
}
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder) *store.ChainStore {
chain := store.NewChainStore(bs, ds, syscalls)
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(bs, ds, syscalls, j)
if err := chain.Load(); err != nil {
log.Warnf("loading chain state from disk: %s", err)

View File

@ -113,7 +113,7 @@ func NewClientDealFunds(ds dtypes.MetadataDS) (ClientDealFunds, error) {
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/client"))
}
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, dealFunds ClientDealFunds) (storagemarket.StorageClient, error) {
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, dealFunds ClientDealFunds, j journal.Journal) (storagemarket.StorageClient, error) {
net := smnet.NewFromLibp2pHost(h)
c, err := storageimpl.NewClient(net, ibs, mds, dataTransfer, discovery, deals, scn, dealFunds, storageimpl.DealPollingInterval(time.Second))
if err != nil {
@ -124,8 +124,8 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, md
OnStart: func(ctx context.Context) error {
c.SubscribeToEvents(marketevents.StorageClientLogger)
evtType := journal.J.RegisterEventType("markets/storage/client", "state_change")
c.SubscribeToEvents(markets.StorageClientJournaler(evtType))
evtType := j.RegisterEventType("markets/storage/client", "state_change")
c.SubscribeToEvents(markets.StorageClientJournaler(j, evtType))
return c.Start(ctx)
},
@ -137,7 +137,7 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, md
}
// RetrievalClient creates a new retrieval client attached to the client blockstore
func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI) (retrievalmarket.RetrievalClient, error) {
func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
adapter := retrievaladapter.NewRetrievalClientNode(payAPI, chainAPI, stateAPI)
network := rmnet.NewFromLibp2pHost(h)
sc := storedcounter.New(ds, datastore.NewKey("/retr"))
@ -150,8 +150,8 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore,
OnStart: func(ctx context.Context) error {
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
evtType := journal.J.RegisterEventType("markets/retrieval/client", "state_change")
client.SubscribeToEvents(markets.RetrievalClientJournaler(evtType))
evtType := j.RegisterEventType("markets/retrieval/client", "state_change")
client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType))
return client.Start(ctx)
},

View File

@ -161,6 +161,7 @@ type StorageMinerParams struct {
SectorIDCounter sealing.SectorIDCounter
Verifier ffiwrapper.Verifier
GetSealingConfigFn dtypes.GetSealingConfigFunc
Journal journal.Journal
}
func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.Miner, error) {
@ -175,6 +176,7 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
sc = params.SectorIDCounter
verif = params.Verifier
gsd = params.GetSealingConfigFn
j = params.Journal
)
maddr, err := minerAddrFromDS(ds)
@ -194,12 +196,12 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
return nil, err
}
fps, err := storage.NewWindowedPoStScheduler(api, fc, sealer, sealer, maddr, worker)
fps, err := storage.NewWindowedPoStScheduler(api, fc, sealer, sealer, j, maddr, worker)
if err != nil {
return nil, err
}
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd, fc)
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd, fc, j)
if err != nil {
return nil, err
}
@ -216,15 +218,15 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
}
}
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider) {
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider, j journal.Journal) {
m.OnReady(marketevents.ReadyLogger("retrieval provider"))
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
m.SubscribeToEvents(marketevents.RetrievalProviderLogger)
evtType := journal.J.RegisterEventType("markets/retrieval/provider", "state_change")
m.SubscribeToEvents(markets.RetrievalProviderJournaler(evtType))
evtType := j.RegisterEventType("markets/retrieval/provider", "state_change")
m.SubscribeToEvents(markets.RetrievalProviderJournaler(j, evtType))
return m.Start(ctx)
},
@ -234,15 +236,15 @@ func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.Retrieva
})
}
func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h storagemarket.StorageProvider) {
func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h storagemarket.StorageProvider, j journal.Journal) {
ctx := helpers.LifecycleCtx(mctx, lc)
h.OnReady(marketevents.ReadyLogger("storage provider"))
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
h.SubscribeToEvents(marketevents.StorageProviderLogger)
evtType := journal.J.RegisterEventType("markets/storage/provider", "state_change")
h.SubscribeToEvents(markets.StorageProviderJournaler(evtType))
evtType := j.RegisterEventType("markets/storage/provider", "state_change")
h.SubscribeToEvents(markets.StorageProviderJournaler(j, evtType))
return h.Start(ctx)
},
@ -354,13 +356,13 @@ func StagingGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.Stagi
return gs
}
func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter) (*miner.Miner, error) {
func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter, j journal.Journal) (*miner.Miner, error) {
minerAddr, err := minerAddrFromDS(ds)
if err != nil {
return nil, err
}
m := miner.NewMiner(api, epp, minerAddr, sf)
m := miner.NewMiner(api, epp, minerAddr, sf, j)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {

View File

@ -23,17 +23,18 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
var glog = logging.Logger("genesis")
func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
return func() (*types.BlockHeader, error) {
glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network")
b, err := genesis2.MakeGenesisBlock(context.TODO(), bs, syscalls, template)
b, err := genesis2.MakeGenesisBlock(context.TODO(), j, bs, syscalls, template)
if err != nil {
return nil, xerrors.Errorf("make genesis block failed: %w", err)
}
@ -50,8 +51,8 @@ func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.Cha
}
}
func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
return func() (*types.BlockHeader, error) {
glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network")
genesisTemplate, err := homedir.Expand(genesisTemplate)
@ -73,7 +74,7 @@ func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore
template.Timestamp = uint64(build.Clock.Now().Unix())
}
b, err := genesis2.MakeGenesisBlock(context.TODO(), bs, syscalls, template)
b, err := genesis2.MakeGenesisBlock(context.TODO(), j, bs, syscalls, template)
if err != nil {
return nil, xerrors.Errorf("make genesis block: %w", err)
}

View File

@ -57,6 +57,8 @@ type Miner struct {
sealing *sealing.Sealing
sealingEvtType journal.EventType
journal journal.Journal
}
// SealingStateEvt is a journal event that records a sector state transition.
@ -110,7 +112,7 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error)
}
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig) (*Miner, error) {
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal) (*Miner, error) {
m := &Miner{
api: api,
feeCfg: feeCfg,
@ -123,7 +125,8 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d
maddr: maddr,
worker: worker,
getSealConfig: gsd,
sealingEvtType: journal.J.RegisterEventType("storage", "sealing_states"),
journal: journal,
sealingEvtType: journal.RegisterEventType("storage", "sealing_states"),
}
return m, nil
@ -156,7 +159,7 @@ func (m *Miner) Run(ctx context.Context) error {
}
func (m *Miner) handleSealingNotifications(before, after sealing.SectorInfo) {
journal.J.RecordEvent(m.sealingEvtType, func() interface{} {
m.journal.RecordEvent(m.sealingEvtType, func() interface{} {
return SealingStateEvt{
SectorNumber: before.SectorNumber,
SectorType: before.SectorType,

View File

@ -26,11 +26,10 @@ import (
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
)
func (s *WindowPoStScheduler) failPost(err error, ts *types.TipSet, deadline *dline.Info) {
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
c := evtCommon{Error: err}
if ts != nil {
c.Deadline = deadline
@ -54,7 +53,7 @@ func (s *WindowPoStScheduler) failPost(err error, ts *types.TipSet, deadline *dl
// recordProofsEvent records a successful proofs_processed event in the
// journal, even if it was a noop (no partitions).
func (s *WindowPoStScheduler) recordProofsEvent(partitions []miner.PoStPartition, mcid cid.Cid) {
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} {
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} {
return &WdPoStProofsProcessedEvt{
evtCommon: s.getEvtCommon(nil),
Partitions: partitions,
@ -74,7 +73,7 @@ func (s *WindowPoStScheduler) startGeneratePoST(
go func() {
defer abort()
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return WdPoStSchedulerEvt{
evtCommon: s.getEvtCommon(nil),
State: SchedulerStateStarted,
@ -125,7 +124,7 @@ func (s *WindowPoStScheduler) startSubmitPoST(
err := s.runSubmitPoST(ctx, ts, deadline, posts)
if err == nil {
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return WdPoStSchedulerEvt{
evtCommon: s.getEvtCommon(nil),
State: SchedulerStateSucceeded,
@ -439,7 +438,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
log.Errorf("checking sector recoveries: %v", err)
}
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
j := WdPoStRecoveriesProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: recoveries,
@ -458,7 +457,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
log.Errorf("checking sector faults: %v", err)
}
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
return WdPoStFaultsProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: faults,

View File

@ -25,6 +25,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
)
type mockStorageMinerAPI struct {
@ -168,6 +169,7 @@ func TestWDPostDoPost(t *testing.T) {
proofType: proofType,
actor: postAct,
worker: workerAct,
journal: journal.NilJournal(),
}
di := &dline.Info{

View File

@ -35,12 +35,13 @@ type WindowPoStScheduler struct {
worker address.Address
evtTypes [4]journal.EventType
journal journal.Journal
// failed abi.ChainEpoch // eps
// failLk sync.Mutex
}
func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb storage.Prover, ft sectorstorage.FaultTracker, actor address.Address, worker address.Address) (*WindowPoStScheduler, error) {
func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb storage.Prover, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address, worker address.Address) (*WindowPoStScheduler, error) {
mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err)
@ -62,11 +63,12 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb
actor: actor,
worker: worker,
evtTypes: [...]journal.EventType{
evtTypeWdPoStScheduler: journal.J.RegisterEventType("wdpost", "scheduler"),
evtTypeWdPoStProofs: journal.J.RegisterEventType("wdpost", "proofs_processed"),
evtTypeWdPoStRecoveries: journal.J.RegisterEventType("wdpost", "recoveries_processed"),
evtTypeWdPoStFaults: journal.J.RegisterEventType("wdpost", "faults_processed"),
evtTypeWdPoStScheduler: j.RegisterEventType("wdpost", "scheduler"),
evtTypeWdPoStProofs: j.RegisterEventType("wdpost", "proofs_processed"),
evtTypeWdPoStRecoveries: j.RegisterEventType("wdpost", "recoveries_processed"),
evtTypeWdPoStFaults: j.RegisterEventType("wdpost", "faults_processed"),
},
journal: j,
}, nil
}
@ -166,7 +168,7 @@ func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.T
// onAbort is called when generating proofs or submitting proofs is aborted
func (s *WindowPoStScheduler) onAbort(ts *types.TipSet, deadline *dline.Info) {
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
c := evtCommon{}
if ts != nil {
c.Deadline = deadline