diff --git a/chain/messagepool.go b/chain/messagepool.go index bbffd45a5..550aa8428 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -2,13 +2,15 @@ package chain import ( "encoding/base64" - pubsub "github.com/libp2p/go-libp2p-pubsub" "sync" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/pkg/errors" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/stmgr" "github.com/filecoin-project/go-lotus/chain/types" - "github.com/pkg/errors" ) type MessagePool struct { @@ -22,8 +24,8 @@ type MessagePool struct { } type msgSet struct { - msgs map[uint64]*types.SignedMessage - startNonce uint64 + msgs map[uint64]*types.SignedMessage + nextNonce uint64 } func newMsgSet() *msgSet { @@ -32,11 +34,20 @@ func newMsgSet() *msgSet { } } -func (ms *msgSet) add(m *types.SignedMessage) { - if len(ms.msgs) == 0 || m.Message.Nonce < ms.startNonce { - ms.startNonce = m.Message.Nonce +func (ms *msgSet) add(m *types.SignedMessage) error { + if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce { + ms.nextNonce = m.Message.Nonce + 1 + } + if _, has := ms.msgs[m.Message.Nonce]; has { + if m.Cid() != ms.msgs[m.Message.Nonce].Cid() { + log.Error("Add with duplicate nonce") + return xerrors.Errorf("message to %s with nonce %d already in mpool") + } + log.Warn("Add called with the same message multiple times") } ms.msgs[m.Message.Nonce] = m + + return nil } func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool { @@ -76,13 +87,15 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error { return err } - log.Info("mpooladd: %s", base64.StdEncoding.EncodeToString(data)) + log.Infof("mpooladd: %d %s", m.Message.Nonce, base64.StdEncoding.EncodeToString(data)) if err := m.Signature.Verify(m.Message.From, data); err != nil { + log.Warnf("mpooladd signature verification failed: %s", err) return err } if _, err := mp.sm.ChainStore().PutMessage(m); err != nil { + log.Warnf("mpooladd cs.PutMessage failed: %s", err) return err } @@ -106,7 +119,7 @@ func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) { func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { mset, ok := mp.pending[addr] if ok { - return mset.startNonce + uint64(len(mset.msgs)), nil + return mset.nextNonce, nil } act, err := mp.sm.GetActor(addr) @@ -157,7 +170,16 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) { delete(mset.msgs, nonce) if len(mset.msgs) == 0 { - delete(mp.pending, from) + // FIXME: This is racy + //delete(mp.pending, from) + } else { + var max uint64 + for nonce := range mset.msgs { + if max < nonce { + max = nonce + } + } + mset.nextNonce = max + 1 } } @@ -166,13 +188,18 @@ func (mp *MessagePool) Pending() []*types.SignedMessage { defer mp.lk.Unlock() out := make([]*types.SignedMessage, 0) for _, mset := range mp.pending { - for i := mset.startNonce; true; i++ { - m, ok := mset.msgs[i] - if !ok { - break - } - out = append(out, m) + if len(mset.msgs) == 0 { + continue } + + set := make([]*types.SignedMessage, len(mset.msgs)) + var i uint64 + + for i = mset.nextNonce - 1; mset.msgs[i] != nil; i-- { + set[len(mset.msgs)-int(mset.nextNonce-i)] = mset.msgs[i] + } + + out = append(out, set[len(mset.msgs)-int(mset.nextNonce-i-1):]...) } return out diff --git a/lotuspond/front/src/ConnMgr.js b/lotuspond/front/src/ConnMgr.js index c53601883..778d717f8 100644 --- a/lotuspond/front/src/ConnMgr.js +++ b/lotuspond/front/src/ConnMgr.js @@ -25,8 +25,8 @@ class ConnMgr extends React.Component { const nodes = this.props.nodes let keys = Object.keys(nodes) - const newConns = await keys.filter((_, i) => i > 0).map(async (kfrom, i) => { - return keys.filter((_, j) => i >= j).map(async kto => { + const newConns = await keys.filter((_, i) => i > 0).filter(kfrom => this.props.nodes[kfrom].conn !== undefined).map(async (kfrom, i) => { + return keys.filter((_, j) => i >= j).filter(kto => this.props.nodes[kto].conn !== undefined).map(async kto => { const fromNd = this.props.nodes[kfrom] const toNd = this.props.nodes[kto]