mpool: persist local messages

This commit is contained in:
Łukasz Magiera 2019-11-23 20:01:56 +01:00
parent 6af2e946d1
commit 43a5172944
3 changed files with 83 additions and 13 deletions

View File

@ -1,6 +1,7 @@
package chain
import (
"bytes"
"context"
"errors"
"sort"
@ -8,6 +9,9 @@ import (
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
pubsub "github.com/libp2p/go-libp2p-pubsub"
lps "github.com/whyrusleeping/pubsub"
"go.uber.org/multierr"
@ -18,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
var (
@ -35,6 +40,8 @@ var (
const (
msgTopic = "/fil/messages"
localMsgsDs = "/mpool/local"
localUpdates = "update"
)
@ -60,6 +67,8 @@ type MessagePool struct {
blsSigCache *lru.TwoQueueCache
changes *lps.PubSub
localMsgs datastore.Datastore
}
type msgSet struct {
@ -89,7 +98,7 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
return nil
}
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*MessagePool, error) {
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
mp := &MessagePool{
closer: make(chan struct{}),
@ -102,7 +111,15 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
maxTxPoolSize: 5000,
blsSigCache: cache,
changes: lps.New(50),
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
}
if err := mp.loadLocal(); err != nil {
return nil, xerrors.Errorf("loading local messages: %w", err)
}
go mp.repubLocal()
sm.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
err := mp.HeadChange(rev, app)
if err != nil {
@ -111,7 +128,7 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
return err
})
return mp
return mp, nil
}
func (mp *MessagePool) Close() error {
@ -134,13 +151,13 @@ func (mp *MessagePool) repubLocal() {
for _, msg := range msgs {
msgb, err := msg.Serialize()
if err != nil {
multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err))
errout = multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err))
continue
}
err = mp.ps.Publish(msgTopic, msgb)
if err != nil {
multierr.Append(errout, xerrors.Errorf("could not publish: %w", err))
errout = multierr.Append(errout, xerrors.Errorf("could not publish: %w", err))
continue
}
}
@ -156,8 +173,14 @@ func (mp *MessagePool) repubLocal() {
}
func (mp *MessagePool) addLocal(a address.Address) {
mp.localAddrs[a] = struct{}{}
func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {
mp.localAddrs[m.Message.From] = struct{}{}
if err := mp.localMsgs.Put(datastore.NewKey(string(m.Cid().Bytes())), msgb); err != nil {
return xerrors.Errorf("persisting local message: %w", err)
}
return nil
}
func (mp *MessagePool) Push(m *types.SignedMessage) error {
@ -171,7 +194,10 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error {
}
mp.lk.Lock()
mp.addLocal(m.Message.From)
if err := mp.addLocal(m, msgb); err != nil {
mp.lk.Unlock()
return err
}
mp.lk.Unlock()
return mp.ps.Publish(msgTopic, msgb)
@ -237,7 +263,9 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
mp.pending[m.Message.From] = mset
}
mset.add(m)
if err := mset.add(m); err != nil {
log.Error(err)
}
mp.changes.Pub(api.MpoolUpdate{
Type: api.MpoolAdd,
@ -313,7 +341,7 @@ func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*typ
if err := mp.addLocked(msg); err != nil {
return nil, err
}
mp.addLocal(msg.Message.From)
mp.addLocal(msg, msgb)
return msg, mp.ps.Publish(msgTopic, msgb)
}
@ -466,3 +494,31 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err
return out, nil
}
func (mp *MessagePool) loadLocal() error {
res, err := mp.localMsgs.Query(query.Query{})
if err != nil {
return xerrors.Errorf("query local messages: %w", err)
}
for r := range res.Next() {
if r.Error != nil {
return xerrors.Errorf("r.Error: %w", r.Error)
}
var sm types.SignedMessage
if err := sm.UnmarshalCBOR(bytes.NewReader(r.Value)); err != nil {
return xerrors.Errorf("unmarshaling local message: %w", err)
}
if err := mp.Add(&sm); err != nil {
if xerrors.Is(err, ErrNonceTooLow) {
continue // todo: drop the message from local cache (if above certain confidence threshold)
}
return xerrors.Errorf("adding local messgae: %w", err)
}
}
return nil
}

View File

@ -136,7 +136,18 @@ var mpoolStat = &cli.Command{
cur++
}
fmt.Printf("%s, cur %d\n", a, cur-act.Nonce)
past := 0
future := 0
for _, m := range bkt.msgs {
if m.Message.Nonce < act.Nonce {
past++
}
if m.Message.Nonce > cur {
future++
}
}
fmt.Printf("%s, past: %d, cur: %d, future: %d\n", a, past, cur-act.Nonce, future)
}
return nil

View File

@ -41,14 +41,17 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt
return exch
}
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub) *chain.MessagePool {
mp := chain.NewMessagePool(sm, ps)
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*chain.MessagePool, error) {
mp, err := chain.NewMessagePool(sm, ps, ds)
if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err)
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return mp.Close()
},
})
return mp
return mp, nil
}
func ChainBlockstore(r repo.LockedRepo) (dtypes.ChainBlockstore, error) {