lotus/chain/messagepool.go

257 lines
5.5 KiB
Go
Raw Normal View History

2019-07-05 14:46:21 +00:00
package chain
import (
2019-08-09 15:59:12 +00:00
"encoding/base64"
2019-07-05 14:46:21 +00:00
"sync"
2019-07-08 12:51:45 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
2019-09-20 09:01:49 +00:00
"github.com/pkg/errors"
"golang.org/x/xerrors"
2019-07-08 12:51:45 +00:00
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/stmgr"
"github.com/filecoin-project/go-lotus/chain/types"
2019-07-05 14:46:21 +00:00
)
type MessagePool struct {
lk sync.Mutex
pending map[address.Address]*msgSet
sm *stmgr.StateManager
2019-09-16 14:17:08 +00:00
ps *pubsub.PubSub
2019-07-05 14:46:21 +00:00
}
type msgSet struct {
2019-09-20 09:01:49 +00:00
msgs map[uint64]*types.SignedMessage
nextNonce uint64
2019-07-05 14:46:21 +00:00
}
func newMsgSet() *msgSet {
return &msgSet{
msgs: make(map[uint64]*types.SignedMessage),
2019-07-05 14:46:21 +00:00
}
}
2019-09-20 09:01:49 +00:00
func (ms *msgSet) add(m *types.SignedMessage) error {
if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce {
ms.nextNonce = m.Message.Nonce + 1
}
if _, has := ms.msgs[m.Message.Nonce]; has {
if m.Cid() != ms.msgs[m.Message.Nonce].Cid() {
log.Error("Add with duplicate nonce")
return xerrors.Errorf("message to %s with nonce %d already in mpool")
}
log.Warn("Add called with the same message multiple times")
2019-07-05 14:46:21 +00:00
}
ms.msgs[m.Message.Nonce] = m
2019-09-20 09:01:49 +00:00
return nil
2019-07-05 14:46:21 +00:00
}
2019-09-16 14:17:08 +00:00
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
2019-07-05 14:46:21 +00:00
mp := &MessagePool{
pending: make(map[address.Address]*msgSet),
sm: sm,
2019-09-16 21:26:19 +00:00
ps: ps,
2019-07-05 14:46:21 +00:00
}
sm.ChainStore().SubscribeHeadChanges(mp.HeadChange)
2019-07-05 14:46:21 +00:00
return mp
}
2019-09-16 14:17:08 +00:00
func (mp *MessagePool) Push(m *types.SignedMessage) error {
msgb, err := m.Serialize()
if err != nil {
return err
}
if err := mp.Add(m); err != nil {
return err
}
return mp.ps.Publish("/fil/messages", msgb)
}
func (mp *MessagePool) Add(m *types.SignedMessage) error {
2019-07-05 14:46:21 +00:00
mp.lk.Lock()
defer mp.lk.Unlock()
2019-09-16 14:17:08 +00:00
return mp.addLocked(m)
}
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
2019-07-05 14:46:21 +00:00
data, err := m.Message.Serialize()
if err != nil {
return err
}
2019-09-20 09:01:49 +00:00
log.Infof("mpooladd: %d %s", m.Message.Nonce, base64.StdEncoding.EncodeToString(data))
2019-08-09 15:59:12 +00:00
2019-07-05 14:46:21 +00:00
if err := m.Signature.Verify(m.Message.From, data); err != nil {
2019-09-20 09:01:49 +00:00
log.Warnf("mpooladd signature verification failed: %s", err)
2019-07-05 14:46:21 +00:00
return err
}
if _, err := mp.sm.ChainStore().PutMessage(m); err != nil {
2019-09-20 09:01:49 +00:00
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
2019-07-05 14:46:21 +00:00
return err
}
mset, ok := mp.pending[m.Message.From]
if !ok {
mset = newMsgSet()
mp.pending[m.Message.From] = mset
}
mset.add(m)
return nil
}
2019-07-17 06:05:11 +00:00
func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
mp.lk.Lock()
defer mp.lk.Unlock()
2019-09-16 14:17:08 +00:00
return mp.getNonceLocked(addr)
}
func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
2019-07-17 06:05:11 +00:00
mset, ok := mp.pending[addr]
if ok {
2019-09-20 09:01:49 +00:00
return mset.nextNonce, nil
2019-07-17 06:05:11 +00:00
}
act, err := mp.sm.GetActor(addr, nil)
2019-07-17 06:05:11 +00:00
if err != nil {
return 0, err
}
return act.Nonce, nil
}
2019-09-17 08:15:26 +00:00
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
2019-09-16 14:17:08 +00:00
mp.lk.Lock()
defer mp.lk.Unlock()
nonce, err := mp.getNonceLocked(addr)
if err != nil {
2019-09-17 08:15:26 +00:00
return nil, err
2019-09-16 14:17:08 +00:00
}
msg, err := cb(nonce)
if err != nil {
2019-09-17 08:15:26 +00:00
return nil, err
2019-09-16 14:17:08 +00:00
}
msgb, err := msg.Serialize()
if err != nil {
2019-09-17 08:15:26 +00:00
return nil, err
2019-09-16 14:17:08 +00:00
}
if err := mp.addLocked(msg); err != nil {
2019-09-17 08:15:26 +00:00
return nil, err
2019-09-16 14:17:08 +00:00
}
2019-09-17 08:15:26 +00:00
return msg, mp.ps.Publish("/fil/messages", msgb)
2019-09-16 14:17:08 +00:00
}
2019-08-14 04:43:29 +00:00
func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
2019-07-05 14:46:21 +00:00
mp.lk.Lock()
defer mp.lk.Unlock()
2019-08-14 04:43:29 +00:00
mset, ok := mp.pending[from]
2019-07-05 14:46:21 +00:00
if !ok {
return
}
// NB: This deletes any message with the given nonce. This makes sense
// as two messages with the same sender cannot have the same nonce
2019-08-14 04:43:29 +00:00
delete(mset.msgs, nonce)
2019-07-05 14:46:21 +00:00
if len(mset.msgs) == 0 {
2019-09-20 09:01:49 +00:00
// FIXME: This is racy
//delete(mp.pending, from)
} else {
var max uint64
for nonce := range mset.msgs {
if max < nonce {
max = nonce
}
}
mset.nextNonce = max + 1
2019-07-05 14:46:21 +00:00
}
}
func (mp *MessagePool) Pending() []*types.SignedMessage {
2019-07-05 14:46:21 +00:00
mp.lk.Lock()
defer mp.lk.Unlock()
2019-09-06 22:32:42 +00:00
out := make([]*types.SignedMessage, 0)
2019-07-05 14:46:21 +00:00
for _, mset := range mp.pending {
2019-09-20 09:01:49 +00:00
if len(mset.msgs) == 0 {
continue
2019-07-05 14:46:21 +00:00
}
2019-09-20 09:01:49 +00:00
set := make([]*types.SignedMessage, len(mset.msgs))
var i uint64
for i = mset.nextNonce - 1; mset.msgs[i] != nil; i-- {
set[len(mset.msgs)-int(mset.nextNonce-i)] = mset.msgs[i]
2019-07-05 14:46:21 +00:00
}
2019-09-20 09:01:49 +00:00
out = append(out, set[len(mset.msgs)-int(mset.nextNonce-i-1):]...)
2019-07-05 14:46:21 +00:00
}
return out
}
2019-07-26 04:54:22 +00:00
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
2019-07-05 14:46:21 +00:00
for _, ts := range revert {
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := mp.sm.ChainStore().MessagesForBlock(b)
2019-07-05 14:46:21 +00:00
if err != nil {
2019-07-31 07:13:49 +00:00
return errors.Wrapf(err, "failed to get messages for revert block %s(height %d)", b.Cid(), b.Height)
2019-07-05 14:46:21 +00:00
}
for _, msg := range smsgs {
2019-07-05 14:46:21 +00:00
if err := mp.Add(msg); err != nil {
return err
}
}
for _, msg := range bmsgs {
smsg := mp.RecoverSig(msg)
if smsg != nil {
if err := mp.Add(smsg); err != nil {
return err
}
} else {
log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid())
}
}
2019-07-05 14:46:21 +00:00
}
}
for _, ts := range apply {
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := mp.sm.ChainStore().MessagesForBlock(b)
2019-07-05 14:46:21 +00:00
if err != nil {
2019-07-31 07:13:49 +00:00
return errors.Wrapf(err, "failed to get messages for apply block %s(height %d) (msgroot = %s)", b.Cid(), b.Height, b.Messages)
2019-07-05 14:46:21 +00:00
}
for _, msg := range smsgs {
2019-08-14 04:43:29 +00:00
mp.Remove(msg.Message.From, msg.Message.Nonce)
2019-07-05 14:46:21 +00:00
}
for _, msg := range bmsgs {
2019-08-14 04:43:29 +00:00
mp.Remove(msg.From, msg.Nonce)
}
2019-07-05 14:46:21 +00:00
}
}
return nil
}
func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
// TODO: persist signatures for BLS messages for a little while in case of reorgs
return nil
}