Add repub for local messages

License: MIT
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2019-11-13 22:53:18 +01:00
parent 4f9947a27b
commit 2bc5ccab04
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA
3 changed files with 98 additions and 16 deletions

View File

@ -2,10 +2,12 @@ package chain
import (
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
"go.uber.org/multierr"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
@ -26,9 +28,18 @@ var (
ErrInvalidToAddr = errors.New("message had invalid to address")
)
const (
msgTopic = "/fil/messages"
)
type MessagePool struct {
lk sync.Mutex
closer chan struct{}
repubTk *time.Ticker
localAddrs map[address.Address]struct{}
pending map[address.Address]*msgSet
pendingCount int
@ -73,6 +84,9 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
mp := &MessagePool{
closer: make(chan struct{}),
repubTk: time.NewTicker(2 * time.Minute),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
sm: sm,
ps: ps,
@ -91,6 +105,52 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
return mp
}
func (mp *MessagePool) Close() error {
close(mp.closer)
return nil
}
func (mp *MessagePool) repubLocal() {
for {
select {
case <-mp.repubTk.C:
mp.lk.Lock()
msgs := make([]*types.SignedMessage, 0)
for a := range mp.localAddrs {
msgs = append(msgs, mp.pendingFor(a)...)
}
mp.lk.Unlock()
var errout error
for _, msg := range msgs {
msgb, err := msg.Serialize()
if err != nil {
multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err))
continue
}
err = mp.ps.Publish(msgTopic, msgb)
if err != nil {
multierr.Append(errout, xerrors.Errorf("could not publish: %w", err))
continue
}
}
if errout != nil {
log.Errorf("errors while republishing: %+v", errout)
}
case <-mp.closer:
mp.repubTk.Stop()
return
}
}
}
func (mp *MessagePool) addLocal(a address.Address) {
mp.localAddrs[a] = struct{}{}
}
func (mp *MessagePool) Push(m *types.SignedMessage) error {
msgb, err := m.Serialize()
if err != nil {
@ -101,7 +161,11 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error {
return err
}
return mp.ps.Publish("/fil/messages", msgb)
mp.lk.Lock()
mp.addLocal(m.Message.From)
mp.lk.Unlock()
return mp.ps.Publish(msgTopic, msgb)
}
func (mp *MessagePool) Add(m *types.SignedMessage) error {
@ -224,8 +288,9 @@ func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*typ
if err := mp.addLocked(msg); err != nil {
return nil, err
}
mp.addLocal(msg.Message.From)
return msg, mp.ps.Publish("/fil/messages", msgb)
return msg, mp.ps.Publish(msgTopic, msgb)
}
func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
@ -259,24 +324,28 @@ func (mp *MessagePool) Pending() []*types.SignedMessage {
mp.lk.Lock()
defer mp.lk.Unlock()
out := make([]*types.SignedMessage, 0)
for _, mset := range mp.pending {
if len(mset.msgs) == 0 {
continue
}
set := make([]*types.SignedMessage, len(mset.msgs))
var i uint64
for i = mset.nextNonce - 1; mset.msgs[i] != nil; i-- {
set[len(mset.msgs)-int(mset.nextNonce-i)] = mset.msgs[i]
}
out = append(out, set[len(mset.msgs)-int(mset.nextNonce-i-1):]...)
for a := range mp.pending {
out = append(out, mp.pendingFor(a)...)
}
return out
}
func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
mset := mp.pending[a]
if mset == nil || len(mset.msgs) == 0 {
return nil
}
set := make([]*types.SignedMessage, len(mset.msgs))
var i uint64
for i = mset.nextNonce - 1; mset.msgs[i] != nil; i-- {
set[len(mset.msgs)-int(mset.nextNonce-i)] = mset.msgs[i]
}
return set[len(mset.msgs)-int(mset.nextNonce-i-1):]
}
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
for _, ts := range revert {
for _, b := range ts.Blocks() {

View File

@ -202,7 +202,7 @@ func Online() Option {
// Filecoin services
Override(new(*chain.Syncer), chain.NewSyncer),
Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient),
Override(new(*chain.MessagePool), chain.NewMessagePool),
Override(new(*chain.MessagePool), modules.MessagePool),
Override(new(modules.Genesis), modules.ErrorGenesis),
Override(SetGenesisKey, modules.SetGenesis),

View File

@ -12,9 +12,12 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
@ -36,6 +39,16 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt
return exch
}
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub) *chain.MessagePool {
mp := chain.NewMessagePool(sm, ps)
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return mp.Close()
},
})
return mp
}
func ChainBlockstore(r repo.LockedRepo) (dtypes.ChainBlockstore, error) {
blocks, err := r.Datastore("/blocks")
if err != nil {