Merge pull request #690 from filecoin-project/fix/message-nonce-head-change
Fix message nonce tracking during head changes
This commit is contained in:
commit
f4c082c7de
@ -1,4 +1,4 @@
|
|||||||
package chain
|
package messagepool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
@ -9,9 +9,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
"github.com/ipfs/go-datastore/query"
|
"github.com/ipfs/go-datastore/query"
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
lps "github.com/whyrusleeping/pubsub"
|
lps "github.com/whyrusleeping/pubsub"
|
||||||
"go.uber.org/multierr"
|
"go.uber.org/multierr"
|
||||||
@ -19,12 +21,16 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/chain"
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("messagepool")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrMessageTooBig = errors.New("message too big")
|
ErrMessageTooBig = errors.New("message too big")
|
||||||
|
|
||||||
@ -56,9 +62,10 @@ type MessagePool struct {
|
|||||||
pending map[address.Address]*msgSet
|
pending map[address.Address]*msgSet
|
||||||
pendingCount int
|
pendingCount int
|
||||||
|
|
||||||
sm *stmgr.StateManager
|
curTsLk sync.Mutex
|
||||||
|
curTs *types.TipSet
|
||||||
|
|
||||||
ps *pubsub.PubSub
|
api Provider
|
||||||
|
|
||||||
minGasPrice types.BigInt
|
minGasPrice types.BigInt
|
||||||
|
|
||||||
@ -98,20 +105,66 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*MessagePool, error) {
|
type Provider interface {
|
||||||
|
SubscribeHeadChanges(func(rev, app []*types.TipSet) error)
|
||||||
|
PutMessage(m store.ChainMsg) (cid.Cid, error)
|
||||||
|
PubSubPublish(string, []byte) error
|
||||||
|
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
|
||||||
|
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
||||||
|
MessagesForTipset(*types.TipSet) ([]store.ChainMsg, error)
|
||||||
|
LoadTipSet(cids []cid.Cid) (*types.TipSet, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type mpoolProvider struct {
|
||||||
|
sm *stmgr.StateManager
|
||||||
|
ps *pubsub.PubSub
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
|
||||||
|
return &mpoolProvider{sm, ps}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) {
|
||||||
|
mpp.sm.ChainStore().SubscribeHeadChanges(cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) PutMessage(m store.ChainMsg) (cid.Cid, error) {
|
||||||
|
return mpp.sm.ChainStore().PutMessage(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error {
|
||||||
|
return mpp.ps.Publish(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
||||||
|
return mpp.sm.GetActor(addr, ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||||
|
return mpp.sm.ChainStore().MessagesForBlock(h)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, error) {
|
||||||
|
return mpp.sm.ChainStore().MessagesForTipset(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) {
|
||||||
|
return mpp.sm.ChainStore().LoadTipSet(cids)
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) {
|
||||||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
||||||
mp := &MessagePool{
|
mp := &MessagePool{
|
||||||
closer: make(chan struct{}),
|
closer: make(chan struct{}),
|
||||||
repubTk: time.NewTicker(build.BlockDelay * 10 * time.Second),
|
repubTk: time.NewTicker(build.BlockDelay * 10 * time.Second),
|
||||||
localAddrs: make(map[address.Address]struct{}),
|
localAddrs: make(map[address.Address]struct{}),
|
||||||
pending: make(map[address.Address]*msgSet),
|
pending: make(map[address.Address]*msgSet),
|
||||||
sm: sm,
|
|
||||||
ps: ps,
|
|
||||||
minGasPrice: types.NewInt(0),
|
minGasPrice: types.NewInt(0),
|
||||||
maxTxPoolSize: 5000,
|
maxTxPoolSize: 5000,
|
||||||
blsSigCache: cache,
|
blsSigCache: cache,
|
||||||
changes: lps.New(50),
|
changes: lps.New(50),
|
||||||
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
|
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
|
||||||
|
api: api,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := mp.loadLocal(); err != nil {
|
if err := mp.loadLocal(); err != nil {
|
||||||
@ -120,7 +173,7 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.Metadat
|
|||||||
|
|
||||||
go mp.repubLocal()
|
go mp.repubLocal()
|
||||||
|
|
||||||
sm.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
||||||
err := mp.HeadChange(rev, app)
|
err := mp.HeadChange(rev, app)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("mpool head notif handler error: %+v", err)
|
log.Errorf("mpool head notif handler error: %+v", err)
|
||||||
@ -155,7 +208,7 @@ func (mp *MessagePool) repubLocal() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = mp.ps.Publish(msgTopic, msgb)
|
err = mp.api.PubSubPublish(msgTopic, msgb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errout = multierr.Append(errout, xerrors.Errorf("could not publish: %w", err))
|
errout = multierr.Append(errout, xerrors.Errorf("could not publish: %w", err))
|
||||||
continue
|
continue
|
||||||
@ -200,7 +253,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error {
|
|||||||
}
|
}
|
||||||
mp.lk.Unlock()
|
mp.lk.Unlock()
|
||||||
|
|
||||||
return mp.ps.Publish(msgTopic, msgb)
|
return mp.api.PubSubPublish(msgTopic, msgb)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
||||||
@ -252,12 +305,12 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
|||||||
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := mp.sm.ChainStore().PutMessage(m); err != nil {
|
if _, err := mp.api.PutMessage(m); err != nil {
|
||||||
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := mp.sm.ChainStore().PutMessage(&m.Message); err != nil {
|
if _, err := mp.api.PutMessage(&m.Message); err != nil {
|
||||||
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -306,17 +359,56 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
|
|||||||
return stateNonce, nil
|
return stateNonce, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) setCurTipset(ts *types.TipSet) {
|
||||||
|
mp.curTsLk.Lock()
|
||||||
|
defer mp.curTsLk.Unlock()
|
||||||
|
mp.curTs = ts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) getCurTipset() *types.TipSet {
|
||||||
|
mp.curTsLk.Lock()
|
||||||
|
defer mp.curTsLk.Unlock()
|
||||||
|
return mp.curTs
|
||||||
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
|
func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
|
||||||
act, err := mp.sm.GetActor(addr, nil)
|
// TODO: this method probably should be cached
|
||||||
|
|
||||||
|
curTs := mp.getCurTipset()
|
||||||
|
act, err := mp.api.StateGetActor(addr, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return act.Nonce, nil
|
baseNonce := act.Nonce
|
||||||
|
|
||||||
|
// TODO: the correct thing to do here is probably to set curTs to chain.head
|
||||||
|
// but since we have an accurate view of the world until a head change occurs,
|
||||||
|
// this should be fine
|
||||||
|
if curTs == nil {
|
||||||
|
return baseNonce, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := mp.api.MessagesForTipset(curTs)
|
||||||
|
if err != nil {
|
||||||
|
return 0, xerrors.Errorf("failed to check messages for tipset: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range msgs {
|
||||||
|
msg := m.VMMessage()
|
||||||
|
if msg.From == addr {
|
||||||
|
if msg.Nonce != baseNonce {
|
||||||
|
return 0, xerrors.Errorf("tipset %s has bad nonce ordering (%d != %d)", curTs.Cids(), msg.Nonce, baseNonce)
|
||||||
|
}
|
||||||
|
baseNonce++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return baseNonce, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, error) {
|
func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, error) {
|
||||||
act, err := mp.sm.GetActor(addr, nil)
|
act, err := mp.api.StateGetActor(addr, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.EmptyInt, err
|
return types.EmptyInt, err
|
||||||
}
|
}
|
||||||
@ -327,6 +419,9 @@ func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, erro
|
|||||||
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
|
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
if addr.Protocol() == address.ID {
|
||||||
|
log.Warnf("Called pushWithNonce with ID address (%s) this might not be handled properly yet", addr)
|
||||||
|
}
|
||||||
|
|
||||||
nonce, err := mp.getNonceLocked(addr)
|
nonce, err := mp.getNonceLocked(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -350,7 +445,7 @@ func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*typ
|
|||||||
log.Errorf("addLocal failed: %+v", err)
|
log.Errorf("addLocal failed: %+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return msg, mp.ps.Publish(msgTopic, msgb)
|
return msg, mp.api.PubSubPublish(msgTopic, msgb)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
|
func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
|
||||||
@ -421,9 +516,16 @@ func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
|
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
|
||||||
|
|
||||||
for _, ts := range revert {
|
for _, ts := range revert {
|
||||||
|
pts, err := mp.api.LoadTipSet(ts.Parents())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mp.setCurTipset(pts)
|
||||||
for _, b := range ts.Blocks() {
|
for _, b := range ts.Blocks() {
|
||||||
bmsgs, smsgs, err := mp.sm.ChainStore().MessagesForBlock(b)
|
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to get messages for revert block %s(height %d): %w", b.Cid(), b.Height, err)
|
return xerrors.Errorf("failed to get messages for revert block %s(height %d): %w", b.Cid(), b.Height, err)
|
||||||
}
|
}
|
||||||
@ -448,7 +550,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
|
|
||||||
for _, ts := range apply {
|
for _, ts := range apply {
|
||||||
for _, b := range ts.Blocks() {
|
for _, b := range ts.Blocks() {
|
||||||
bmsgs, smsgs, err := mp.sm.ChainStore().MessagesForBlock(b)
|
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
return xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
||||||
}
|
}
|
||||||
@ -460,6 +562,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
mp.Remove(msg.From, msg.Nonce)
|
mp.Remove(msg.From, msg.Nonce)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
mp.setCurTipset(ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -487,7 +590,7 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err
|
|||||||
sub := mp.changes.Sub(localUpdates)
|
sub := mp.changes.Sub(localUpdates)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer mp.changes.Unsub(sub, localIncoming)
|
defer mp.changes.Unsub(sub, chain.LocalIncoming)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
223
chain/messagepool/messagepool_test.go
Normal file
223
chain/messagepool/messagepool_test.go
Normal file
@ -0,0 +1,223 @@
|
|||||||
|
package messagepool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types/mock"
|
||||||
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testMpoolApi struct {
|
||||||
|
cb func(rev, app []*types.TipSet) error
|
||||||
|
|
||||||
|
bmsgs map[cid.Cid][]*types.SignedMessage
|
||||||
|
statenonce map[address.Address]uint64
|
||||||
|
|
||||||
|
tipsets []*types.TipSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestMpoolApi() *testMpoolApi {
|
||||||
|
return &testMpoolApi{
|
||||||
|
bmsgs: make(map[cid.Cid][]*types.SignedMessage),
|
||||||
|
statenonce: make(map[address.Address]uint64),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolApi) applyBlock(t *testing.T, b *types.BlockHeader) {
|
||||||
|
t.Helper()
|
||||||
|
if err := tma.cb(nil, []*types.TipSet{mock.TipSet(b)}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolApi) revertBlock(t *testing.T, b *types.BlockHeader) {
|
||||||
|
t.Helper()
|
||||||
|
if err := tma.cb([]*types.TipSet{mock.TipSet(b)}, nil); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolApi) setStateNonce(addr address.Address, v uint64) {
|
||||||
|
tma.statenonce[addr] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolApi) setBlockMessages(h *types.BlockHeader, msgs ...*types.SignedMessage) {
|
||||||
|
tma.bmsgs[h.Cid()] = msgs
|
||||||
|
tma.tipsets = append(tma.tipsets, mock.TipSet(h))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolApi) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) {
|
||||||
|
tma.cb = cb
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolApi) PutMessage(m store.ChainMsg) (cid.Cid, error) {
|
||||||
|
return cid.Undef, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolApi) PubSubPublish(string, []byte) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolApi) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
||||||
|
return &types.Actor{
|
||||||
|
Nonce: tma.statenonce[addr],
|
||||||
|
Balance: types.NewInt(90000000),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolApi) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||||
|
return nil, tma.bmsgs[h.Cid()], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolApi) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, error) {
|
||||||
|
if len(ts.Blocks()) != 1 {
|
||||||
|
panic("cant deal with multiblock tipsets in this test")
|
||||||
|
}
|
||||||
|
|
||||||
|
bm, sm, err := tma.MessagesForBlock(ts.Blocks()[0])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var out []store.ChainMsg
|
||||||
|
for _, m := range bm {
|
||||||
|
out = append(out, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range sm {
|
||||||
|
out = append(out, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolApi) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) {
|
||||||
|
for _, ts := range tma.tipsets {
|
||||||
|
if types.CidArrsEqual(cids, ts.Cids()) {
|
||||||
|
return ts, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("tipset not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) {
|
||||||
|
t.Helper()
|
||||||
|
n, err := mp.GetNonce(addr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n != val {
|
||||||
|
t.Fatalf("expected nonce of %d, got %d", val, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) {
|
||||||
|
t.Helper()
|
||||||
|
if err := mp.Add(msg); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMessagePool(t *testing.T) {
|
||||||
|
tma := newTestMpoolApi()
|
||||||
|
|
||||||
|
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
|
mp, err := New(tma, ds)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a := mock.MkBlock(nil, 1, 1)
|
||||||
|
|
||||||
|
sender, err := w.GenerateKey(types.KTBLS)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
target := mock.Address(1001)
|
||||||
|
|
||||||
|
var msgs []*types.SignedMessage
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
msgs = append(msgs, mock.MkMessage(sender, target, uint64(i), w))
|
||||||
|
}
|
||||||
|
|
||||||
|
tma.setStateNonce(sender, 0)
|
||||||
|
assertNonce(t, mp, sender, 0)
|
||||||
|
mustAdd(t, mp, msgs[0])
|
||||||
|
assertNonce(t, mp, sender, 1)
|
||||||
|
mustAdd(t, mp, msgs[1])
|
||||||
|
assertNonce(t, mp, sender, 2)
|
||||||
|
|
||||||
|
tma.setBlockMessages(a, msgs[0], msgs[1])
|
||||||
|
tma.applyBlock(t, a)
|
||||||
|
|
||||||
|
assertNonce(t, mp, sender, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRevertMessages(t *testing.T) {
|
||||||
|
tma := newTestMpoolApi()
|
||||||
|
|
||||||
|
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
|
mp, err := New(tma, ds)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a := mock.MkBlock(nil, 1, 1)
|
||||||
|
b := mock.MkBlock(mock.TipSet(a), 1, 1)
|
||||||
|
|
||||||
|
sender, err := w.GenerateKey(types.KTBLS)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
target := mock.Address(1001)
|
||||||
|
|
||||||
|
var msgs []*types.SignedMessage
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
msgs = append(msgs, mock.MkMessage(sender, target, uint64(i), w))
|
||||||
|
}
|
||||||
|
|
||||||
|
tma.setBlockMessages(a, msgs[0])
|
||||||
|
tma.setBlockMessages(b, msgs[1], msgs[2], msgs[3])
|
||||||
|
|
||||||
|
mustAdd(t, mp, msgs[0])
|
||||||
|
mustAdd(t, mp, msgs[1])
|
||||||
|
mustAdd(t, mp, msgs[2])
|
||||||
|
mustAdd(t, mp, msgs[3])
|
||||||
|
|
||||||
|
tma.setStateNonce(sender, 0)
|
||||||
|
tma.applyBlock(t, a)
|
||||||
|
assertNonce(t, mp, sender, 4)
|
||||||
|
|
||||||
|
tma.setStateNonce(sender, 1)
|
||||||
|
tma.applyBlock(t, b)
|
||||||
|
assertNonce(t, mp, sender, 4)
|
||||||
|
tma.setStateNonce(sender, 0)
|
||||||
|
tma.revertBlock(t, b)
|
||||||
|
|
||||||
|
assertNonce(t, mp, sender, 4)
|
||||||
|
|
||||||
|
if len(mp.Pending()) != 3 {
|
||||||
|
t.Fatal("expected three messages in mempool")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -601,6 +601,7 @@ func (cs *ChainStore) readAMTCids(root cid.Cid) ([]cid.Cid, error) {
|
|||||||
type ChainMsg interface {
|
type ChainMsg interface {
|
||||||
Cid() cid.Cid
|
Cid() cid.Cid
|
||||||
VMMessage() *types.Message
|
VMMessage() *types.Message
|
||||||
|
ToStorageBlock() (block.Block, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) MessagesForTipset(ts *types.TipSet) ([]ChainMsg, error) {
|
func (cs *ChainStore) MessagesForTipset(ts *types.TipSet) ([]ChainMsg, error) {
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -54,7 +55,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(ctx context.Context, mpool *chain.MessagePool, msub *pubsub.Subscription) {
|
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {
|
||||||
for {
|
for {
|
||||||
msg, err := msub.Next(ctx)
|
msg, err := msub.Next(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -39,7 +39,7 @@ import (
|
|||||||
|
|
||||||
var log = logging.Logger("chain")
|
var log = logging.Logger("chain")
|
||||||
|
|
||||||
var localIncoming = "incoming"
|
var LocalIncoming = "incoming"
|
||||||
|
|
||||||
type Syncer struct {
|
type Syncer struct {
|
||||||
// The heaviest known tipset in the network.
|
// The heaviest known tipset in the network.
|
||||||
@ -119,7 +119,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
syncer.incoming.Pub(fts.TipSet().Blocks(), localIncoming)
|
syncer.incoming.Pub(fts.TipSet().Blocks(), LocalIncoming)
|
||||||
|
|
||||||
if from == syncer.self {
|
if from == syncer.self {
|
||||||
// TODO: this is kindof a hack...
|
// TODO: this is kindof a hack...
|
||||||
@ -152,11 +152,11 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
|
func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
|
||||||
sub := syncer.incoming.Sub(localIncoming)
|
sub := syncer.incoming.Sub(LocalIncoming)
|
||||||
out := make(chan *types.BlockHeader, 10)
|
out := make(chan *types.BlockHeader, 10)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer syncer.incoming.Unsub(sub, localIncoming)
|
defer syncer.incoming.Unsub(sub, LocalIncoming)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -1,10 +1,12 @@
|
|||||||
package mock
|
package mock
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -16,6 +18,26 @@ func Address(i uint64) address.Address {
|
|||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func MkMessage(from, to address.Address, nonce uint64, w *wallet.Wallet) *types.SignedMessage {
|
||||||
|
msg := &types.Message{
|
||||||
|
To: to,
|
||||||
|
From: from,
|
||||||
|
Value: types.NewInt(1),
|
||||||
|
Nonce: nonce,
|
||||||
|
GasLimit: types.NewInt(1),
|
||||||
|
GasPrice: types.NewInt(0),
|
||||||
|
}
|
||||||
|
|
||||||
|
sig, err := w.Sign(context.TODO(), from, msg.Cid().Bytes())
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return &types.SignedMessage{
|
||||||
|
Message: *msg,
|
||||||
|
Signature: *sig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types.BlockHeader {
|
func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types.BlockHeader {
|
||||||
addr := Address(123561)
|
addr := Address(123561)
|
||||||
|
|
||||||
|
@ -279,7 +279,7 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to create block: %w", err)
|
return nil, xerrors.Errorf("failed to create block: %w", err)
|
||||||
}
|
}
|
||||||
log.Infow("mined new block", "cid", b.Cid())
|
log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height)
|
||||||
|
|
||||||
dur := time.Now().Sub(start)
|
dur := time.Now().Sub(start)
|
||||||
log.Infof("Creating block took %s", dur)
|
log.Infof("Creating block took %s", dur)
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/deals"
|
"github.com/filecoin-project/lotus/chain/deals"
|
||||||
"github.com/filecoin-project/lotus/chain/gen"
|
"github.com/filecoin-project/lotus/chain/gen"
|
||||||
"github.com/filecoin-project/lotus/chain/market"
|
"github.com/filecoin-project/lotus/chain/market"
|
||||||
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
"github.com/filecoin-project/lotus/chain/metrics"
|
"github.com/filecoin-project/lotus/chain/metrics"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
@ -202,7 +203,7 @@ func Online() Option {
|
|||||||
// Filecoin services
|
// Filecoin services
|
||||||
Override(new(*chain.Syncer), modules.NewSyncer),
|
Override(new(*chain.Syncer), modules.NewSyncer),
|
||||||
Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient),
|
Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient),
|
||||||
Override(new(*chain.MessagePool), modules.MessagePool),
|
Override(new(*messagepool.MessagePool), modules.MessagePool),
|
||||||
|
|
||||||
Override(new(modules.Genesis), modules.ErrorGenesis),
|
Override(new(modules.Genesis), modules.ErrorGenesis),
|
||||||
Override(SetGenesisKey, modules.SetGenesis),
|
Override(SetGenesisKey, modules.SetGenesis),
|
||||||
|
@ -7,8 +7,8 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain"
|
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -17,7 +17,7 @@ type MpoolAPI struct {
|
|||||||
|
|
||||||
WalletAPI
|
WalletAPI
|
||||||
|
|
||||||
Mpool *chain.MessagePool
|
Mpool *messagepool.MessagePool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
"github.com/filecoin-project/lotus/chain/blocksync"
|
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||||
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
@ -41,8 +42,9 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt
|
|||||||
return exch
|
return exch
|
||||||
}
|
}
|
||||||
|
|
||||||
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*chain.MessagePool, error) {
|
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*messagepool.MessagePool, error) {
|
||||||
mp, err := chain.NewMessagePool(sm, ps, ds)
|
mpp := messagepool.NewProvider(sm, ps)
|
||||||
|
mp, err := messagepool.New(mpp, ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("constructing mpool: %w", err)
|
return nil, xerrors.Errorf("constructing mpool: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
"github.com/filecoin-project/lotus/chain/blocksync"
|
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||||
"github.com/filecoin-project/lotus/chain/deals"
|
"github.com/filecoin-project/lotus/chain/deals"
|
||||||
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
"github.com/filecoin-project/lotus/chain/sub"
|
"github.com/filecoin-project/lotus/chain/sub"
|
||||||
"github.com/filecoin-project/lotus/node/hello"
|
"github.com/filecoin-project/lotus/node/hello"
|
||||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
@ -53,7 +54,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubs
|
|||||||
go sub.HandleIncomingBlocks(ctx, blocksub, s)
|
go sub.HandleIncomingBlocks(ctx, blocksub, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, mpool *chain.MessagePool) {
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, mpool *messagepool.MessagePool) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
msgsub, err := pubsub.Subscribe("/fil/messages")
|
msgsub, err := pubsub.Subscribe("/fil/messages")
|
||||||
|
Loading…
Reference in New Issue
Block a user