add mpool journal events; fix dependency injection.

This commit is contained in:
Raúl Kripalani 2020-07-20 16:37:24 +01:00
parent 226786c1da
commit 7459ec6bba
4 changed files with 58 additions and 6 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
@ -65,6 +66,20 @@ const (
localUpdates = "update" localUpdates = "update"
) )
// Journal event types.
const (
evtTypeMpoolAdd = iota
evtTypeMpoolRemove
evtTypeMpoolRepub
)
// MessagePoolEvt is the journal event type emitted by the MessagePool.
type MessagePoolEvt struct {
Action string
MessageCIDs []cid.Cid
Error error `json:",omitempty"`
}
type MessagePool struct { type MessagePool struct {
lk sync.Mutex lk sync.Mutex
@ -93,6 +108,9 @@ type MessagePool struct {
netName dtypes.NetworkName netName dtypes.NetworkName
sigValCache *lru.TwoQueueCache sigValCache *lru.TwoQueueCache
jrnl journal.Journal
evtTypes [3]journal.EventType
} }
type msgSet struct { type msgSet struct {
@ -185,7 +203,7 @@ func (mpp *mpoolProvider) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
return mpp.sm.ChainStore().LoadTipSet(tsk) return mpp.sm.ChainStore().LoadTipSet(tsk)
} }
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) { func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, jrnl journal.Journal) (*MessagePool, error) {
cache, _ := lru.New2Q(build.BlsSignatureCacheSize) cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
verifcache, _ := lru.New2Q(build.VerifSigCacheSize) verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
@ -202,6 +220,12 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)), localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
api: api, api: api,
netName: netName, netName: netName,
jrnl: jrnl,
evtTypes: [...]journal.EventType{
evtTypeMpoolAdd: jrnl.RegisterEventType("mpool", "add"),
evtTypeMpoolRemove: jrnl.RegisterEventType("mpool", "remove"),
evtTypeMpoolRepub: jrnl.RegisterEventType("mpool", "repub"),
},
} }
if err := mp.loadLocal(); err != nil { if err := mp.loadLocal(); err != nil {
@ -284,6 +308,19 @@ func (mp *MessagePool) repubLocal() {
if errout != nil { if errout != nil {
log.Errorf("errors while republishing: %+v", errout) log.Errorf("errors while republishing: %+v", errout)
} }
journal.MaybeAddEntry(mp.jrnl, mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
cids := make([]cid.Cid, 0, len(outputMsgs))
for _, m := range outputMsgs {
cids = append(cids, m.Cid())
}
return MessagePoolEvt{
Action: "repub",
MessageCIDs: cids,
Error: errout,
}
})
case <-mp.closer: case <-mp.closer:
mp.repubTk.Stop() mp.repubTk.Stop()
return return
@ -448,6 +485,14 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
Type: api.MpoolAdd, Type: api.MpoolAdd,
Message: m, Message: m,
}, localUpdates) }, localUpdates)
journal.MaybeAddEntry(mp.jrnl, mp.evtTypes[evtTypeMpoolAdd], func() interface{} {
return MessagePoolEvt{
Action: "add",
MessageCIDs: []cid.Cid{m.Cid()},
}
})
return nil return nil
} }
@ -580,6 +625,13 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
Type: api.MpoolRemove, Type: api.MpoolRemove,
Message: m, Message: m,
}, localUpdates) }, localUpdates)
journal.MaybeAddEntry(mp.jrnl, mp.evtTypes[evtTypeMpoolRemove], func() interface{} {
return MessagePoolEvt{
Action: "remove",
MessageCIDs: []cid.Cid{m.Cid()},
}
})
} }
// NB: This deletes any message with the given nonce. This makes sense // NB: This deletes any message with the given nonce. This makes sense

View File

@ -147,7 +147,7 @@ func TestMessagePool(t *testing.T) {
ds := datastore.NewMapDatastore() ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "mptest") mp, err := New(tma, ds, "mptest", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -188,7 +188,7 @@ func TestRevertMessages(t *testing.T) {
ds := datastore.NewMapDatastore() ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "mptest") mp, err := New(tma, ds, "mptest", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -155,7 +155,7 @@ func defaults() []Option {
Override(new(journal.Journal), journal.OpenFSJournal), Override(new(journal.Journal), journal.OpenFSJournal),
Override(new(journal.DisabledEvents), journal.DisabledEvents{}), Override(new(journal.DisabledEvents), journal.DisabledEvents{}),
Override(InitJournalKey, func(j *journal.Journal) { /* forces the creation of the journal at startup */ }), Override(InitJournalKey, func(j journal.Journal) { /* forces the creation of the journal at startup */ }),
// Filecoin modules // Filecoin modules
} }

View File

@ -47,9 +47,9 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt
return exch return exch
} }
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName) (*messagepool.MessagePool, error) { func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, jrnl journal.Journal) (*messagepool.MessagePool, error) {
mpp := messagepool.NewProvider(sm, ps) mpp := messagepool.NewProvider(sm, ps)
mp, err := messagepool.New(mpp, ds, nn) mp, err := messagepool.New(mpp, ds, nn, jrnl)
if err != nil { if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err) return nil, xerrors.Errorf("constructing mpool: %w", err)
} }