diff --git a/chain/messagepool.go b/chain/messagepool.go index 428b81241..c9a4ace27 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -2,10 +2,12 @@ package chain import ( "sync" + "time" lru "github.com/hashicorp/golang-lru" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/pkg/errors" + "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/build" @@ -26,9 +28,18 @@ var ( ErrInvalidToAddr = errors.New("message had invalid to address") ) +const ( + msgTopic = "/fil/messages" +) + type MessagePool struct { lk sync.Mutex + closer chan struct{} + repubTk *time.Ticker + + localAddrs map[address.Address]struct{} + pending map[address.Address]*msgSet pendingCount int @@ -73,6 +84,9 @@ func (ms *msgSet) add(m *types.SignedMessage) error { func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool { cache, _ := lru.New2Q(build.BlsSignatureCacheSize) mp := &MessagePool{ + closer: make(chan struct{}), + repubTk: time.NewTicker(2 * time.Minute), + localAddrs: make(map[address.Address]struct{}), pending: make(map[address.Address]*msgSet), sm: sm, ps: ps, @@ -91,6 +105,52 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool { return mp } +func (mp *MessagePool) Close() error { + close(mp.closer) + return nil +} + +func (mp *MessagePool) repubLocal() { + for { + select { + case <-mp.repubTk.C: + mp.lk.Lock() + msgs := make([]*types.SignedMessage, 0) + for a := range mp.localAddrs { + msgs = append(msgs, mp.pendingFor(a)...) + } + mp.lk.Unlock() + + var errout error + for _, msg := range msgs { + msgb, err := msg.Serialize() + if err != nil { + multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err)) + continue + } + + err = mp.ps.Publish(msgTopic, msgb) + if err != nil { + multierr.Append(errout, xerrors.Errorf("could not publish: %w", err)) + continue + } + } + + if errout != nil { + log.Errorf("errors while republishing: %+v", errout) + } + case <-mp.closer: + mp.repubTk.Stop() + return + } + } + +} + +func (mp *MessagePool) addLocal(a address.Address) { + mp.localAddrs[a] = struct{}{} +} + func (mp *MessagePool) Push(m *types.SignedMessage) error { msgb, err := m.Serialize() if err != nil { @@ -101,7 +161,11 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error { return err } - return mp.ps.Publish("/fil/messages", msgb) + mp.lk.Lock() + mp.addLocal(m.Message.From) + mp.lk.Unlock() + + return mp.ps.Publish(msgTopic, msgb) } func (mp *MessagePool) Add(m *types.SignedMessage) error { @@ -224,8 +288,9 @@ func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*typ if err := mp.addLocked(msg); err != nil { return nil, err } + mp.addLocal(msg.Message.From) - return msg, mp.ps.Publish("/fil/messages", msgb) + return msg, mp.ps.Publish(msgTopic, msgb) } func (mp *MessagePool) Remove(from address.Address, nonce uint64) { @@ -259,24 +324,28 @@ func (mp *MessagePool) Pending() []*types.SignedMessage { mp.lk.Lock() defer mp.lk.Unlock() out := make([]*types.SignedMessage, 0) - for _, mset := range mp.pending { - if len(mset.msgs) == 0 { - continue - } - - set := make([]*types.SignedMessage, len(mset.msgs)) - var i uint64 - - for i = mset.nextNonce - 1; mset.msgs[i] != nil; i-- { - set[len(mset.msgs)-int(mset.nextNonce-i)] = mset.msgs[i] - } - - out = append(out, set[len(mset.msgs)-int(mset.nextNonce-i-1):]...) + for a := range mp.pending { + out = append(out, mp.pendingFor(a)...) } return out } +func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage { + mset := mp.pending[a] + if mset == nil || len(mset.msgs) == 0 { + return nil + } + + set := make([]*types.SignedMessage, len(mset.msgs)) + var i uint64 + + for i = mset.nextNonce - 1; mset.msgs[i] != nil; i-- { + set[len(mset.msgs)-int(mset.nextNonce-i)] = mset.msgs[i] + } + return set[len(mset.msgs)-int(mset.nextNonce-i-1):] +} + func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { for _, ts := range revert { for _, b := range ts.Blocks() { diff --git a/node/builder.go b/node/builder.go index 585973c0c..ce647e822 100644 --- a/node/builder.go +++ b/node/builder.go @@ -202,7 +202,7 @@ func Online() Option { // Filecoin services Override(new(*chain.Syncer), chain.NewSyncer), Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient), - Override(new(*chain.MessagePool), chain.NewMessagePool), + Override(new(*chain.MessagePool), modules.MessagePool), Override(new(modules.Genesis), modules.ErrorGenesis), Override(SetGenesisKey, modules.SetGenesis), diff --git a/node/modules/chain.go b/node/modules/chain.go index 17008d30a..fa77a40b8 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -12,9 +12,12 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/routing" + pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/chain" + "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -36,6 +39,16 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt return exch } +func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub) *chain.MessagePool { + mp := chain.NewMessagePool(sm, ps) + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return mp.Close() + }, + }) + return mp +} + func ChainBlockstore(r repo.LockedRepo) (dtypes.ChainBlockstore, error) { blocks, err := r.Datastore("/blocks") if err != nil {