Merge pull request #596 from filecoin-project/feat/rebrc-mempool
Add rebup for local mesages
This commit is contained in:
commit
3e22712f2d
@ -1,11 +1,14 @@
|
||||
package chain
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/multierr"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
@ -26,9 +29,18 @@ var (
|
||||
ErrInvalidToAddr = errors.New("message had invalid to address")
|
||||
)
|
||||
|
||||
const (
|
||||
msgTopic = "/fil/messages"
|
||||
)
|
||||
|
||||
type MessagePool struct {
|
||||
lk sync.Mutex
|
||||
|
||||
closer chan struct{}
|
||||
repubTk *time.Ticker
|
||||
|
||||
localAddrs map[address.Address]struct{}
|
||||
|
||||
pending map[address.Address]*msgSet
|
||||
pendingCount int
|
||||
|
||||
@ -73,6 +85,9 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
|
||||
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
|
||||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
||||
mp := &MessagePool{
|
||||
closer: make(chan struct{}),
|
||||
repubTk: time.NewTicker(build.BlockDelay * 10 * time.Second),
|
||||
localAddrs: make(map[address.Address]struct{}),
|
||||
pending: make(map[address.Address]*msgSet),
|
||||
sm: sm,
|
||||
ps: ps,
|
||||
@ -91,6 +106,52 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
|
||||
return mp
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Close() error {
|
||||
close(mp.closer)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) repubLocal() {
|
||||
for {
|
||||
select {
|
||||
case <-mp.repubTk.C:
|
||||
mp.lk.Lock()
|
||||
msgs := make([]*types.SignedMessage, 0)
|
||||
for a := range mp.localAddrs {
|
||||
msgs = append(msgs, mp.pendingFor(a)...)
|
||||
}
|
||||
mp.lk.Unlock()
|
||||
|
||||
var errout error
|
||||
for _, msg := range msgs {
|
||||
msgb, err := msg.Serialize()
|
||||
if err != nil {
|
||||
multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err))
|
||||
continue
|
||||
}
|
||||
|
||||
err = mp.ps.Publish(msgTopic, msgb)
|
||||
if err != nil {
|
||||
multierr.Append(errout, xerrors.Errorf("could not publish: %w", err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if errout != nil {
|
||||
log.Errorf("errors while republishing: %+v", errout)
|
||||
}
|
||||
case <-mp.closer:
|
||||
mp.repubTk.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (mp *MessagePool) addLocal(a address.Address) {
|
||||
mp.localAddrs[a] = struct{}{}
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Push(m *types.SignedMessage) error {
|
||||
msgb, err := m.Serialize()
|
||||
if err != nil {
|
||||
@ -101,7 +162,11 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return mp.ps.Publish("/fil/messages", msgb)
|
||||
mp.lk.Lock()
|
||||
mp.addLocal(m.Message.From)
|
||||
mp.lk.Unlock()
|
||||
|
||||
return mp.ps.Publish(msgTopic, msgb)
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
||||
@ -224,8 +289,9 @@ func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*typ
|
||||
if err := mp.addLocked(msg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mp.addLocal(msg.Message.From)
|
||||
|
||||
return msg, mp.ps.Publish("/fil/messages", msgb)
|
||||
return msg, mp.ps.Publish(msgTopic, msgb)
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
|
||||
@ -259,24 +325,32 @@ func (mp *MessagePool) Pending() []*types.SignedMessage {
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
out := make([]*types.SignedMessage, 0)
|
||||
for _, mset := range mp.pending {
|
||||
if len(mset.msgs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
set := make([]*types.SignedMessage, len(mset.msgs))
|
||||
var i uint64
|
||||
|
||||
for i = mset.nextNonce - 1; mset.msgs[i] != nil; i-- {
|
||||
set[len(mset.msgs)-int(mset.nextNonce-i)] = mset.msgs[i]
|
||||
}
|
||||
|
||||
out = append(out, set[len(mset.msgs)-int(mset.nextNonce-i-1):]...)
|
||||
for a := range mp.pending {
|
||||
out = append(out, mp.pendingFor(a)...)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
|
||||
mset := mp.pending[a]
|
||||
if mset == nil || len(mset.msgs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
set := make([]*types.SignedMessage, 0, len(mset.msgs))
|
||||
|
||||
for _, m := range mset.msgs {
|
||||
set = append(set, m)
|
||||
}
|
||||
|
||||
sort.Slice(set, func(i, j int) bool {
|
||||
return set[i].Message.Nonce < set[j].Message.Nonce
|
||||
})
|
||||
|
||||
return set
|
||||
}
|
||||
|
||||
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
|
||||
for _, ts := range revert {
|
||||
for _, b := range ts.Blocks() {
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
const sectorSize = 1024
|
||||
|
||||
func TestSealAndVerify(t *testing.T) {
|
||||
//t.Skip("this is slow")
|
||||
t.Skip("this is slow")
|
||||
os.Setenv("BELLMAN_NO_GPU", "1")
|
||||
os.Setenv("RUST_LOG", "info")
|
||||
|
||||
|
@ -202,7 +202,7 @@ func Online() Option {
|
||||
// Filecoin services
|
||||
Override(new(*chain.Syncer), chain.NewSyncer),
|
||||
Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient),
|
||||
Override(new(*chain.MessagePool), chain.NewMessagePool),
|
||||
Override(new(*chain.MessagePool), modules.MessagePool),
|
||||
|
||||
Override(new(modules.Genesis), modules.ErrorGenesis),
|
||||
Override(SetGenesisKey, modules.SetGenesis),
|
||||
|
@ -12,9 +12,12 @@ import (
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/routing"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
@ -36,6 +39,16 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt
|
||||
return exch
|
||||
}
|
||||
|
||||
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub) *chain.MessagePool {
|
||||
mp := chain.NewMessagePool(sm, ps)
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(_ context.Context) error {
|
||||
return mp.Close()
|
||||
},
|
||||
})
|
||||
return mp
|
||||
}
|
||||
|
||||
func ChainBlockstore(r repo.LockedRepo) (dtypes.ChainBlockstore, error) {
|
||||
blocks, err := r.Datastore("/blocks")
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user