Merge pull request #675 from filecoin-project/fix/mpool
Fix some Mpool / PoSt sched bugs
This commit is contained in:
commit
7952b7525e
7
Makefile
7
Makefile
@ -60,9 +60,12 @@ CLEAN+=build/.update-modules
|
|||||||
deps: $(BUILD_DEPS)
|
deps: $(BUILD_DEPS)
|
||||||
.PHONY: deps
|
.PHONY: deps
|
||||||
|
|
||||||
|
debug: GOFLAGS=-tags=debug
|
||||||
|
debug: lotus lotus-storage-miner
|
||||||
|
|
||||||
lotus: $(BUILD_DEPS)
|
lotus: $(BUILD_DEPS)
|
||||||
rm -f lotus
|
rm -f lotus
|
||||||
go build -o lotus ./cmd/lotus
|
go build $(GOFLAGS) -o lotus ./cmd/lotus
|
||||||
go run github.com/GeertJohan/go.rice/rice append --exec lotus -i ./build
|
go run github.com/GeertJohan/go.rice/rice append --exec lotus -i ./build
|
||||||
|
|
||||||
.PHONY: lotus
|
.PHONY: lotus
|
||||||
@ -70,7 +73,7 @@ CLEAN+=lotus
|
|||||||
|
|
||||||
lotus-storage-miner: $(BUILD_DEPS)
|
lotus-storage-miner: $(BUILD_DEPS)
|
||||||
rm -f lotus-storage-miner
|
rm -f lotus-storage-miner
|
||||||
go build -o lotus-storage-miner ./cmd/lotus-storage-miner
|
go build $(GOFLAGS) -o lotus-storage-miner ./cmd/lotus-storage-miner
|
||||||
go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build
|
go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build
|
||||||
|
|
||||||
.PHONY: lotus-storage-miner
|
.PHONY: lotus-storage-miner
|
||||||
|
15
build/params_debug.go
Normal file
15
build/params_debug.go
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
// +build debug
|
||||||
|
|
||||||
|
package build
|
||||||
|
|
||||||
|
import "os"
|
||||||
|
|
||||||
|
// Seconds
|
||||||
|
const BlockDelay = 2
|
||||||
|
|
||||||
|
// Blocks
|
||||||
|
const ProvingPeriodDuration uint64 = 40
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
os.Setenv("TRUST_PARAMS", "1")
|
||||||
|
}
|
9
build/params_devnet.go
Normal file
9
build/params_devnet.go
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
// +build !debug
|
||||||
|
|
||||||
|
package build
|
||||||
|
|
||||||
|
// Seconds
|
||||||
|
const BlockDelay = 12
|
||||||
|
|
||||||
|
// Blocks
|
||||||
|
const ProvingPeriodDuration uint64 = 300
|
@ -36,9 +36,6 @@ const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours
|
|||||||
// /////
|
// /////
|
||||||
// Consensus / Network
|
// Consensus / Network
|
||||||
|
|
||||||
// Seconds
|
|
||||||
const BlockDelay = 12
|
|
||||||
|
|
||||||
// Seconds
|
// Seconds
|
||||||
const AllowableClockDrift = BlockDelay * 2
|
const AllowableClockDrift = BlockDelay * 2
|
||||||
|
|
||||||
@ -59,9 +56,6 @@ const WRatioDen = 2
|
|||||||
// /////
|
// /////
|
||||||
// Proofs
|
// Proofs
|
||||||
|
|
||||||
// Blocks
|
|
||||||
const ProvingPeriodDuration uint64 = 300
|
|
||||||
|
|
||||||
// PoStChallangeTime sets the window in which post computation should happen
|
// PoStChallangeTime sets the window in which post computation should happen
|
||||||
// Blocks
|
// Blocks
|
||||||
const PoStChallangeTime = ProvingPeriodDuration - 6
|
const PoStChallangeTime = ProvingPeriodDuration - 6
|
@ -6,7 +6,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -302,9 +304,10 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand
|
|||||||
e.lk.Lock()
|
e.lk.Lock()
|
||||||
defer e.lk.Unlock()
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
done, more, err := check(e.tsc.best())
|
ts := e.tsc.best()
|
||||||
|
done, more, err := check(ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err)
|
||||||
}
|
}
|
||||||
if done {
|
if done {
|
||||||
timeout = NoTimeout
|
timeout = NoTimeout
|
||||||
@ -335,6 +338,6 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, msg *types.Message) error {
|
func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, msg store.ChainMsg) error {
|
||||||
return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg))
|
return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg.VMMessage()))
|
||||||
}
|
}
|
||||||
|
@ -96,7 +96,7 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
|
|||||||
span.End()
|
span.End()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", triggerH, ts.Height(), err)
|
log.Errorf("chain trigger (@H %d, called @ %d) failed: %+v", triggerH, ts.Height(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -5,28 +5,32 @@ import (
|
|||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (e *calledEvents) CheckMsg(ctx context.Context, msg *types.Message, hnd CalledHandler) CheckFunc {
|
func (e *calledEvents) CheckMsg(ctx context.Context, smsg store.ChainMsg, hnd CalledHandler) CheckFunc {
|
||||||
|
msg := smsg.VMMessage()
|
||||||
|
|
||||||
return func(ts *types.TipSet) (done bool, more bool, err error) {
|
return func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||||
fa, err := e.cs.StateGetActor(ctx, msg.From, ts)
|
fa, err := e.cs.StateGetActor(ctx, msg.From, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, true, err
|
return false, true, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: probably want to look at the chain to make sure it's
|
// >= because actor nonce is actually the next nonce that is expected to appear on chain
|
||||||
// the right message, but this is probably good enough for now
|
if msg.Nonce >= fa.Nonce {
|
||||||
done = fa.Nonce >= msg.Nonce
|
return false, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
rec, err := e.cs.StateGetReceipt(ctx, msg.Cid(), ts)
|
rec, err := e.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, true, err
|
return false, true, xerrors.Errorf("getting receipt in CheckMsg: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
more, err = hnd(msg, rec, ts, ts.Height())
|
more, err = hnd(msg, rec, ts, ts.Height())
|
||||||
|
|
||||||
return done, more, err
|
return true, more, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package chain
|
package chain
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"sort"
|
"sort"
|
||||||
@ -8,6 +9,9 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru"
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
|
"github.com/ipfs/go-datastore/query"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
lps "github.com/whyrusleeping/pubsub"
|
lps "github.com/whyrusleeping/pubsub"
|
||||||
"go.uber.org/multierr"
|
"go.uber.org/multierr"
|
||||||
@ -18,6 +22,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -35,6 +40,8 @@ var (
|
|||||||
const (
|
const (
|
||||||
msgTopic = "/fil/messages"
|
msgTopic = "/fil/messages"
|
||||||
|
|
||||||
|
localMsgsDs = "/mpool/local"
|
||||||
|
|
||||||
localUpdates = "update"
|
localUpdates = "update"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -60,6 +67,8 @@ type MessagePool struct {
|
|||||||
blsSigCache *lru.TwoQueueCache
|
blsSigCache *lru.TwoQueueCache
|
||||||
|
|
||||||
changes *lps.PubSub
|
changes *lps.PubSub
|
||||||
|
|
||||||
|
localMsgs datastore.Datastore
|
||||||
}
|
}
|
||||||
|
|
||||||
type msgSet struct {
|
type msgSet struct {
|
||||||
@ -89,7 +98,7 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
|
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*MessagePool, error) {
|
||||||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
||||||
mp := &MessagePool{
|
mp := &MessagePool{
|
||||||
closer: make(chan struct{}),
|
closer: make(chan struct{}),
|
||||||
@ -99,10 +108,18 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
|
|||||||
sm: sm,
|
sm: sm,
|
||||||
ps: ps,
|
ps: ps,
|
||||||
minGasPrice: types.NewInt(0),
|
minGasPrice: types.NewInt(0),
|
||||||
maxTxPoolSize: 100000,
|
maxTxPoolSize: 5000,
|
||||||
blsSigCache: cache,
|
blsSigCache: cache,
|
||||||
changes: lps.New(50),
|
changes: lps.New(50),
|
||||||
|
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := mp.loadLocal(); err != nil {
|
||||||
|
return nil, xerrors.Errorf("loading local messages: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go mp.repubLocal()
|
||||||
|
|
||||||
sm.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
sm.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
||||||
err := mp.HeadChange(rev, app)
|
err := mp.HeadChange(rev, app)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -111,7 +128,7 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
|
|||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
|
||||||
return mp
|
return mp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Close() error {
|
func (mp *MessagePool) Close() error {
|
||||||
@ -134,13 +151,13 @@ func (mp *MessagePool) repubLocal() {
|
|||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
msgb, err := msg.Serialize()
|
msgb, err := msg.Serialize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err))
|
errout = multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = mp.ps.Publish(msgTopic, msgb)
|
err = mp.ps.Publish(msgTopic, msgb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
multierr.Append(errout, xerrors.Errorf("could not publish: %w", err))
|
errout = multierr.Append(errout, xerrors.Errorf("could not publish: %w", err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -156,8 +173,14 @@ func (mp *MessagePool) repubLocal() {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addLocal(a address.Address) {
|
func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {
|
||||||
mp.localAddrs[a] = struct{}{}
|
mp.localAddrs[m.Message.From] = struct{}{}
|
||||||
|
|
||||||
|
if err := mp.localMsgs.Put(datastore.NewKey(string(m.Cid().Bytes())), msgb); err != nil {
|
||||||
|
return xerrors.Errorf("persisting local message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Push(m *types.SignedMessage) error {
|
func (mp *MessagePool) Push(m *types.SignedMessage) error {
|
||||||
@ -171,7 +194,10 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
mp.addLocal(m.Message.From)
|
if err := mp.addLocal(m, msgb); err != nil {
|
||||||
|
mp.lk.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
mp.lk.Unlock()
|
mp.lk.Unlock()
|
||||||
|
|
||||||
return mp.ps.Publish(msgTopic, msgb)
|
return mp.ps.Publish(msgTopic, msgb)
|
||||||
@ -231,13 +257,20 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, err := mp.sm.ChainStore().PutMessage(&m.Message); err != nil {
|
||||||
|
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
mset, ok := mp.pending[m.Message.From]
|
mset, ok := mp.pending[m.Message.From]
|
||||||
if !ok {
|
if !ok {
|
||||||
mset = newMsgSet()
|
mset = newMsgSet()
|
||||||
mp.pending[m.Message.From] = mset
|
mp.pending[m.Message.From] = mset
|
||||||
}
|
}
|
||||||
|
|
||||||
mset.add(m)
|
if err := mset.add(m); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
mp.changes.Pub(api.MpoolUpdate{
|
mp.changes.Pub(api.MpoolUpdate{
|
||||||
Type: api.MpoolAdd,
|
Type: api.MpoolAdd,
|
||||||
@ -254,12 +287,23 @@ func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
|
func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
|
||||||
|
stateNonce, err := mp.getStateNonce(addr) // sanity check
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
mset, ok := mp.pending[addr]
|
mset, ok := mp.pending[addr]
|
||||||
if ok {
|
if ok {
|
||||||
|
if stateNonce > mset.nextNonce {
|
||||||
|
log.Errorf("state nonce was larger than mset.nextNonce (%d > %d)", stateNonce, mset.nextNonce)
|
||||||
|
|
||||||
|
return stateNonce, nil
|
||||||
|
}
|
||||||
|
|
||||||
return mset.nextNonce, nil
|
return mset.nextNonce, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return mp.getStateNonce(addr)
|
return stateNonce, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
|
func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
|
||||||
@ -302,7 +346,9 @@ func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*typ
|
|||||||
if err := mp.addLocked(msg); err != nil {
|
if err := mp.addLocked(msg); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
mp.addLocal(msg.Message.From)
|
if err := mp.addLocal(msg, msgb); err != nil {
|
||||||
|
log.Errorf("addLocal failed: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
return msg, mp.ps.Publish(msgTopic, msgb)
|
return msg, mp.ps.Publish(msgTopic, msgb)
|
||||||
}
|
}
|
||||||
@ -328,8 +374,7 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
|
|||||||
delete(mset.msgs, nonce)
|
delete(mset.msgs, nonce)
|
||||||
|
|
||||||
if len(mset.msgs) == 0 {
|
if len(mset.msgs) == 0 {
|
||||||
// FIXME: This is racy
|
delete(mp.pending, from)
|
||||||
//delete(mp.pending, from)
|
|
||||||
} else {
|
} else {
|
||||||
var max uint64
|
var max uint64
|
||||||
for nonce := range mset.msgs {
|
for nonce := range mset.msgs {
|
||||||
@ -337,6 +382,10 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
|
|||||||
max = nonce
|
max = nonce
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if max < nonce {
|
||||||
|
max = nonce // we could have not seen the removed message before
|
||||||
|
}
|
||||||
|
|
||||||
mset.nextNonce = max + 1
|
mset.nextNonce = max + 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -380,7 +429,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
}
|
}
|
||||||
for _, msg := range smsgs {
|
for _, msg := range smsgs {
|
||||||
if err := mp.Add(msg); err != nil {
|
if err := mp.Add(msg); err != nil {
|
||||||
return err
|
log.Error(err) // TODO: probably lots of spam in multi-block tsets
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -388,7 +437,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
smsg := mp.RecoverSig(msg)
|
smsg := mp.RecoverSig(msg)
|
||||||
if smsg != nil {
|
if smsg != nil {
|
||||||
if err := mp.Add(smsg); err != nil {
|
if err := mp.Add(smsg); err != nil {
|
||||||
return err
|
log.Error(err) // TODO: probably lots of spam in multi-block tsets
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid())
|
log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid())
|
||||||
@ -423,7 +472,7 @@ func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
|
|||||||
}
|
}
|
||||||
sig, ok := val.(types.Signature)
|
sig, ok := val.(types.Signature)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warnf("value in signature cache was not a signature (got %T)", val)
|
log.Errorf("value in signature cache was not a signature (got %T)", val)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -456,3 +505,31 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err
|
|||||||
|
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) loadLocal() error {
|
||||||
|
res, err := mp.localMsgs.Query(query.Query{})
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("query local messages: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for r := range res.Next() {
|
||||||
|
if r.Error != nil {
|
||||||
|
return xerrors.Errorf("r.Error: %w", r.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
var sm types.SignedMessage
|
||||||
|
if err := sm.UnmarshalCBOR(bytes.NewReader(r.Value)); err != nil {
|
||||||
|
return xerrors.Errorf("unmarshaling local message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := mp.Add(&sm); err != nil {
|
||||||
|
if xerrors.Is(err, ErrNonceTooLow) {
|
||||||
|
continue // todo: drop the message from local cache (if above certain confidence threshold)
|
||||||
|
}
|
||||||
|
|
||||||
|
return xerrors.Errorf("adding local message: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -322,7 +322,12 @@ func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Addres
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.TipSet) (*types.MessageReceipt, error) {
|
func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.TipSet) (*types.MessageReceipt, error) {
|
||||||
r, err := sm.tipsetExecutedMessage(ts, msg)
|
m, err := sm.cs.GetCMessage(msg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to load message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r, err := sm.tipsetExecutedMessage(ts, msg, m.VMMessage())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -331,11 +336,6 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T
|
|||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
m, err := sm.cs.GetCMessage(msg)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to load message: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, r, err = sm.searchBackForMsg(ctx, ts, m)
|
_, r, err = sm.searchBackForMsg(ctx, ts, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to look back through chain for message: %w", err)
|
return nil, fmt.Errorf("failed to look back through chain for message: %w", err)
|
||||||
@ -368,7 +368,7 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid) (*type
|
|||||||
return nil, nil, fmt.Errorf("expected current head on SHC stream (got %s)", head[0].Type)
|
return nil, nil, fmt.Errorf("expected current head on SHC stream (got %s)", head[0].Type)
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := sm.tipsetExecutedMessage(head[0].Val, mcid)
|
r, err := sm.tipsetExecutedMessage(head[0].Val, mcid, msg.VMMessage())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -403,7 +403,7 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid) (*type
|
|||||||
case store.HCRevert:
|
case store.HCRevert:
|
||||||
continue
|
continue
|
||||||
case store.HCApply:
|
case store.HCApply:
|
||||||
r, err := sm.tipsetExecutedMessage(val.Val, mcid)
|
r, err := sm.tipsetExecutedMessage(val.Val, mcid, msg.VMMessage())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -454,7 +454,7 @@ func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet
|
|||||||
return nil, nil, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err)
|
return nil, nil, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := sm.tipsetExecutedMessage(ts, m.Cid())
|
r, err := sm.tipsetExecutedMessage(ts, m.Cid(), m.VMMessage())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("checking for message execution during lookback: %w", err)
|
return nil, nil, fmt.Errorf("checking for message execution during lookback: %w", err)
|
||||||
}
|
}
|
||||||
@ -467,7 +467,7 @@ func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid) (*types.MessageReceipt, error) {
|
func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid, vmm *types.Message) (*types.MessageReceipt, error) {
|
||||||
// The genesis block did not execute any messages
|
// The genesis block did not execute any messages
|
||||||
if ts.Height() == 0 {
|
if ts.Height() == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@ -483,9 +483,24 @@ func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid) (*t
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, m := range cm {
|
for ii := range cm {
|
||||||
if m.Cid() == msg {
|
// iterate in reverse because we going backwards through the chain
|
||||||
return sm.cs.GetParentReceipt(ts.Blocks()[0], i)
|
i := len(cm) - ii - 1
|
||||||
|
m := cm[i]
|
||||||
|
|
||||||
|
if m.VMMessage().From == vmm.From { // cheaper to just check origin first
|
||||||
|
if m.VMMessage().Nonce == vmm.Nonce {
|
||||||
|
if m.Cid() == msg {
|
||||||
|
return sm.cs.GetParentReceipt(ts.Blocks()[0], i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// this should be that message
|
||||||
|
return nil, xerrors.Errorf("found message with equal nonce as the one we are looking for (F:%s n %d, TS: %s n%d)",
|
||||||
|
msg, vmm.Nonce, m.Cid(), m.VMMessage().Nonce)
|
||||||
|
}
|
||||||
|
if m.VMMessage().Nonce < vmm.Nonce {
|
||||||
|
return nil, nil // don't bother looking further
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -552,6 +552,9 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (ChainMsg, error) {
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
if err != bstore.ErrNotFound {
|
||||||
|
log.Warn("GetCMessage: unexpected error getting unsigned message: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
return cs.GetSignedMessage(c)
|
return cs.GetSignedMessage(c)
|
||||||
}
|
}
|
||||||
|
77
cli/mpool.go
77
cli/mpool.go
@ -4,7 +4,11 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
"gopkg.in/urfave/cli.v2"
|
"gopkg.in/urfave/cli.v2"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
var mpoolCmd = &cli.Command{
|
var mpoolCmd = &cli.Command{
|
||||||
@ -13,6 +17,7 @@ var mpoolCmd = &cli.Command{
|
|||||||
Subcommands: []*cli.Command{
|
Subcommands: []*cli.Command{
|
||||||
mpoolPending,
|
mpoolPending,
|
||||||
mpoolSub,
|
mpoolSub,
|
||||||
|
mpoolStat,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,3 +81,75 @@ var mpoolSub = &cli.Command{
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type statBucket struct {
|
||||||
|
msgs map[uint64]*types.SignedMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
var mpoolStat = &cli.Command{
|
||||||
|
Name: "stat",
|
||||||
|
Usage: "print mempool stats",
|
||||||
|
Action: func(cctx *cli.Context) error {
|
||||||
|
api, closer, err := GetFullNodeAPI(cctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer closer()
|
||||||
|
|
||||||
|
ctx := ReqContext(cctx)
|
||||||
|
|
||||||
|
ts, err := api.ChainHead(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting chain head: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := api.MpoolPending(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
buckets := map[address.Address]*statBucket{}
|
||||||
|
|
||||||
|
for _, v := range msgs {
|
||||||
|
bkt, ok := buckets[v.Message.From]
|
||||||
|
if !ok {
|
||||||
|
bkt = &statBucket{
|
||||||
|
msgs: map[uint64]*types.SignedMessage{},
|
||||||
|
}
|
||||||
|
buckets[v.Message.From] = bkt
|
||||||
|
}
|
||||||
|
|
||||||
|
bkt.msgs[v.Message.Nonce] = v
|
||||||
|
}
|
||||||
|
for a, bkt := range buckets {
|
||||||
|
act, err := api.StateGetActor(ctx, a, ts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cur := act.Nonce
|
||||||
|
for {
|
||||||
|
_, ok := bkt.msgs[cur]
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
cur++
|
||||||
|
}
|
||||||
|
|
||||||
|
past := 0
|
||||||
|
future := 0
|
||||||
|
for _, m := range bkt.msgs {
|
||||||
|
if m.Message.Nonce < act.Nonce {
|
||||||
|
past++
|
||||||
|
}
|
||||||
|
if m.Message.Nonce > cur {
|
||||||
|
future++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("%s, past: %d, cur: %d, future: %d\n", a, past, cur-act.Nonce, future)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
@ -377,7 +377,7 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs
|
|||||||
}
|
}
|
||||||
|
|
||||||
if msg.Message.Nonce < inclNonces[from] {
|
if msg.Message.Nonce < inclNonces[from] {
|
||||||
log.Warnf("message in mempool has already used nonce (%d < %d) %s", msg.Message.Nonce, inclNonces[from], msg.Cid())
|
log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,14 +41,17 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt
|
|||||||
return exch
|
return exch
|
||||||
}
|
}
|
||||||
|
|
||||||
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub) *chain.MessagePool {
|
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*chain.MessagePool, error) {
|
||||||
mp := chain.NewMessagePool(sm, ps)
|
mp, err := chain.NewMessagePool(sm, ps, ds)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("constructing mpool: %w", err)
|
||||||
|
}
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStop: func(_ context.Context) error {
|
OnStop: func(_ context.Context) error {
|
||||||
return mp.Close()
|
return mp.Close()
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
return mp
|
return mp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ChainBlockstore(r repo.LockedRepo) (dtypes.ChainBlockstore, error) {
|
func ChainBlockstore(r repo.LockedRepo) (dtypes.ChainBlockstore, error) {
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -24,13 +23,13 @@ func (m *Miner) beginPosting(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts)
|
sppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get proving period end for miner: %s", err)
|
log.Errorf("failed to get proving period end for miner (ts h: %d): %s", ts.Height(), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if ppe == 0 {
|
if sppe == 0 {
|
||||||
log.Warn("Not proving yet")
|
log.Warn("Not proving yet")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -44,12 +43,16 @@ func (m *Miner) beginPosting(ctx context.Context) {
|
|||||||
|
|
||||||
// height needs to be +1, because otherwise we'd be trying to schedule PoSt
|
// height needs to be +1, because otherwise we'd be trying to schedule PoSt
|
||||||
// at current block height
|
// at current block height
|
||||||
ppe, _ = actors.ProvingPeriodEnd(ppe, ts.Height()+1)
|
ppe, _ := actors.ProvingPeriodEnd(sppe, ts.Height()+1)
|
||||||
m.schedPost = ppe
|
m.schedPost = ppe
|
||||||
|
|
||||||
m.postLk.Unlock()
|
m.postLk.Unlock()
|
||||||
|
|
||||||
log.Infof("Scheduling post at height %d", ppe-build.PoStChallangeTime)
|
if build.PoStChallangeTime > ppe {
|
||||||
|
ppe = build.PoStChallangeTime
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Scheduling post at height %d (begin ts: %d, statePPE: %d)", ppe-build.PoStChallangeTime, ts.Height(), sppe)
|
||||||
err = m.events.ChainAt(m.computePost(m.schedPost), func(ctx context.Context, ts *types.TipSet) error { // Revert
|
err = m.events.ChainAt(m.computePost(m.schedPost), func(ctx context.Context, ts *types.TipSet) error { // Revert
|
||||||
// TODO: Cancel post
|
// TODO: Cancel post
|
||||||
log.Errorf("TODO: Cancel PoSt, re-run")
|
log.Errorf("TODO: Cancel PoSt, re-run")
|
||||||
@ -115,8 +118,7 @@ type post struct {
|
|||||||
proof []byte
|
proof []byte
|
||||||
|
|
||||||
// commit
|
// commit
|
||||||
msg *types.Message
|
smsg *types.SignedMessage
|
||||||
smsg cid.Cid
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *post) doPost(ctx context.Context) error {
|
func (p *post) doPost(ctx context.Context) error {
|
||||||
@ -124,19 +126,19 @@ func (p *post) doPost(ctx context.Context) error {
|
|||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if err := p.preparePost(ctx); err != nil {
|
if err := p.preparePost(ctx); err != nil {
|
||||||
return err
|
return xerrors.Errorf("prepare: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := p.runPost(ctx); err != nil {
|
if err := p.runPost(ctx); err != nil {
|
||||||
return err
|
return xerrors.Errorf("run: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := p.commitPost(ctx); err != nil {
|
if err := p.commitPost(ctx); err != nil {
|
||||||
return err
|
return xerrors.Errorf("commit: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := p.waitCommit(ctx); err != nil {
|
if err := p.waitCommit(ctx); err != nil {
|
||||||
return err
|
return xerrors.Errorf("wait: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -149,7 +151,7 @@ func (p *post) preparePost(ctx context.Context) error {
|
|||||||
|
|
||||||
sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts)
|
sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to get proving set for miner: %w", err)
|
return xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", p.ts.Height(), err)
|
||||||
}
|
}
|
||||||
if len(sset) == 0 {
|
if len(sset) == 0 {
|
||||||
log.Warn("empty proving set! (ts.H: %d)", p.ts.Height())
|
log.Warn("empty proving set! (ts.H: %d)", p.ts.Height())
|
||||||
@ -209,7 +211,7 @@ func (p *post) runPost(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *post) commitPost(ctx context.Context) error {
|
func (p *post) commitPost(ctx context.Context) (err error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "storage.commitPost")
|
ctx, span := trace.StartSpan(ctx, "storage.commitPost")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
@ -223,7 +225,7 @@ func (p *post) commitPost(ctx context.Context) error {
|
|||||||
return xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
|
return xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.msg = &types.Message{
|
msg := &types.Message{
|
||||||
To: p.m.maddr,
|
To: p.m.maddr,
|
||||||
From: p.m.worker,
|
From: p.m.worker,
|
||||||
Method: actors.MAMethods.SubmitPoSt,
|
Method: actors.MAMethods.SubmitPoSt,
|
||||||
@ -235,11 +237,10 @@ func (p *post) commitPost(ctx context.Context) error {
|
|||||||
|
|
||||||
log.Info("mpush")
|
log.Info("mpush")
|
||||||
|
|
||||||
smsg, err := p.m.api.MpoolPushMessage(ctx, p.msg)
|
p.smsg, err = p.m.api.MpoolPushMessage(ctx, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("pushing message to mpool: %w", err)
|
return xerrors.Errorf("pushing message to mpool: %w", err)
|
||||||
}
|
}
|
||||||
p.smsg = smsg.Cid()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -248,7 +249,7 @@ func (p *post) waitCommit(ctx context.Context) error {
|
|||||||
ctx, span := trace.StartSpan(ctx, "storage.waitPost")
|
ctx, span := trace.StartSpan(ctx, "storage.waitPost")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
log.Infof("Waiting for post %s to appear on chain", p.smsg)
|
log.Infof("Waiting for post %s to appear on chain", p.smsg.Cid())
|
||||||
|
|
||||||
err := p.m.events.CalledMsg(ctx, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) {
|
err := p.m.events.CalledMsg(ctx, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) {
|
||||||
if rec.ExitCode != 0 {
|
if rec.ExitCode != 0 {
|
||||||
@ -261,9 +262,9 @@ func (p *post) waitCommit(ctx context.Context) error {
|
|||||||
}, func(ctx context.Context, ts *types.TipSet) error {
|
}, func(ctx context.Context, ts *types.TipSet) error {
|
||||||
log.Warn("post message reverted")
|
log.Warn("post message reverted")
|
||||||
return nil
|
return nil
|
||||||
}, 3, postMsgTimeout, p.msg)
|
}, 3, postMsgTimeout, p.smsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return xerrors.Errorf("waiting for post to appear on chain: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -284,19 +285,13 @@ func (m *Miner) computePost(ppe uint64) func(ctx context.Context, ts *types.TipS
|
|||||||
ppe: ppe,
|
ppe: ppe,
|
||||||
ts: ts,
|
ts: ts,
|
||||||
}).doPost(ctx)
|
}).doPost(ctx)
|
||||||
|
|
||||||
|
m.scheduleNextPost(ppe + build.ProvingPeriodDuration)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("doPost: %w", err)
|
return xerrors.Errorf("doPost: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.scheduleNextPost(ppe + build.ProvingPeriodDuration)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func sectorIdList(si []*api.ChainSectorInfo) []uint64 {
|
|
||||||
out := make([]uint64, len(si))
|
|
||||||
for i, s := range si {
|
|
||||||
out[i] = s.SectorID
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user