more correctly handle discrepancies between mempools head and the mining base
This commit is contained in:
parent
d789ca9299
commit
cf3298cd04
@ -27,6 +27,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/chain/vm"
|
"github.com/filecoin-project/lotus/chain/vm"
|
||||||
"github.com/filecoin-project/lotus/lib/sigs"
|
"github.com/filecoin-project/lotus/lib/sigs"
|
||||||
@ -880,6 +881,73 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
return merr
|
return merr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) runHeadChange(from *types.TipSet, to *types.TipSet, rmsgs map[address.Address]map[uint64]*types.SignedMessage) error {
|
||||||
|
add := func(m *types.SignedMessage) {
|
||||||
|
s, ok := rmsgs[m.Message.From]
|
||||||
|
if !ok {
|
||||||
|
s = make(map[uint64]*types.SignedMessage)
|
||||||
|
rmsgs[m.Message.From] = s
|
||||||
|
}
|
||||||
|
s[m.Message.Nonce] = m
|
||||||
|
}
|
||||||
|
rm := func(from address.Address, nonce uint64) {
|
||||||
|
s, ok := rmsgs[from]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := s[nonce]; ok {
|
||||||
|
delete(s, nonce)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
revert, apply, err := store.ReorgOps(mp.api.LoadTipSet, from, to)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to compute reorg ops for mpool pending messages: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var merr error
|
||||||
|
|
||||||
|
for _, ts := range revert {
|
||||||
|
msgs, err := mp.MessagesForBlocks(ts.Blocks())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error retrieving messages for reverted block: %s", err)
|
||||||
|
merr = multierror.Append(merr, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msg := range msgs {
|
||||||
|
add(msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ts := range apply {
|
||||||
|
mp.curTs = ts
|
||||||
|
|
||||||
|
for _, b := range ts.Blocks() {
|
||||||
|
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
|
||||||
|
if err != nil {
|
||||||
|
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
||||||
|
log.Errorf("error retrieving messages for block: %s", xerr)
|
||||||
|
merr = multierror.Append(merr, xerr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msg := range smsgs {
|
||||||
|
rm(msg.Message.From, msg.Message.Nonce)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msg := range bmsgs {
|
||||||
|
rm(msg.From, msg.Nonce)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return merr
|
||||||
|
}
|
||||||
|
|
||||||
type statBucket struct {
|
type statBucket struct {
|
||||||
msgs map[uint64]*types.SignedMessage
|
msgs map[uint64]*types.SignedMessage
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/chain/vm"
|
"github.com/filecoin-project/lotus/chain/vm"
|
||||||
abig "github.com/filecoin-project/specs-actors/actors/abi/big"
|
abig "github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)
|
var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)
|
||||||
@ -528,7 +527,6 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.
|
|||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
result := make(map[address.Address]map[uint64]*types.SignedMessage)
|
result := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||||
haveCids := make(map[cid.Cid]struct{})
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if dt := time.Since(start); dt > time.Millisecond {
|
if dt := time.Since(start); dt > time.Millisecond {
|
||||||
log.Infow("get pending messages done", "took", dt)
|
log.Infow("get pending messages done", "took", dt)
|
||||||
@ -554,10 +552,6 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.
|
|||||||
}
|
}
|
||||||
result[a] = msetCopy
|
result[a] = msetCopy
|
||||||
|
|
||||||
// mark the messages as seen
|
|
||||||
for _, m := range mset.msgs {
|
|
||||||
haveCids[m.Cid()] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -566,72 +560,11 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// nope, we need to sync the tipsets
|
if err := mp.runHeadChange(curTs, ts, result); err != nil {
|
||||||
for {
|
return nil, xerrors.Errorf("failed to process difference between mpool head and given head: %w", err)
|
||||||
if curTs.Height() == ts.Height() {
|
|
||||||
if curTs.Equals(ts) {
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// different blocks in tipsets -- we mark them as seen so that they are not included in
|
|
||||||
// in the message set we return, but *neither me (vyzo) nor why understand why*
|
|
||||||
// this code is also probably completely untested in production, so I am adding a big fat
|
|
||||||
// warning to revisit this case and sanity check this decision.
|
|
||||||
log.Warnf("mpool tipset has same height as target tipset but it's not equal; beware of dragons!")
|
|
||||||
|
|
||||||
have, err := mp.MessagesForBlocks(ts.Blocks())
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("error retrieving messages for tipset: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, m := range have {
|
|
||||||
haveCids[m.Cid()] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
msgs, err := mp.MessagesForBlocks(ts.Blocks())
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("error retrieving messages for tipset: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, m := range msgs {
|
|
||||||
if _, have := haveCids[m.Cid()]; have {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
haveCids[m.Cid()] = struct{}{}
|
|
||||||
mset, ok := result[m.Message.From]
|
|
||||||
if !ok {
|
|
||||||
mset = make(map[uint64]*types.SignedMessage)
|
|
||||||
result[m.Message.From] = mset
|
|
||||||
}
|
|
||||||
|
|
||||||
other, dupNonce := mset[m.Message.Nonce]
|
|
||||||
if dupNonce {
|
|
||||||
// duplicate nonce, selfishly keep the message with the highest GasPrice
|
|
||||||
// if the gas prices are the same, keep the one with the highest GasLimit
|
|
||||||
switch m.Message.GasPremium.Int.Cmp(other.Message.GasPremium.Int) {
|
|
||||||
case 0:
|
|
||||||
if m.Message.GasLimit > other.Message.GasLimit {
|
|
||||||
mset[m.Message.Nonce] = m
|
|
||||||
}
|
|
||||||
case 1:
|
|
||||||
mset[m.Message.Nonce] = m
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
mset[m.Message.Nonce] = m
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if curTs.Height() >= ts.Height() {
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ts, err = mp.api.LoadTipSet(ts.Parents())
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("error loading parent tipset: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *big.Int {
|
func (mp *MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *big.Int {
|
||||||
@ -671,7 +604,11 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
|
|||||||
// cannot exceed the block limit; drop all messages that exceed the limit
|
// cannot exceed the block limit; drop all messages that exceed the limit
|
||||||
// - the total gasReward cannot exceed the actor's balance; drop all messages that exceed
|
// - the total gasReward cannot exceed the actor's balance; drop all messages that exceed
|
||||||
// the balance
|
// the balance
|
||||||
a, _ := mp.api.StateGetActor(actor, ts)
|
a, err := mp.api.StateGetActor(actor, ts)
|
||||||
|
if err != nil {
|
||||||
|
panic(err) // TODO
|
||||||
|
}
|
||||||
|
|
||||||
curNonce := a.Nonce
|
curNonce := a.Nonce
|
||||||
balance := a.Balance.Int
|
balance := a.Balance.Int
|
||||||
gasLimit := int64(0)
|
gasLimit := int64(0)
|
||||||
|
@ -483,6 +483,10 @@ func (cs *ChainStore) NearestCommonAncestor(a, b *types.TipSet) (*types.TipSet,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) {
|
func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) {
|
||||||
|
return ReorgOps(cs.LoadTipSet, a, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ReorgOps(lts func(types.TipSetKey) (*types.TipSet, error), a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) {
|
||||||
left := a
|
left := a
|
||||||
right := b
|
right := b
|
||||||
|
|
||||||
@ -490,7 +494,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti
|
|||||||
for !left.Equals(right) {
|
for !left.Equals(right) {
|
||||||
if left.Height() > right.Height() {
|
if left.Height() > right.Height() {
|
||||||
leftChain = append(leftChain, left)
|
leftChain = append(leftChain, left)
|
||||||
par, err := cs.LoadTipSet(left.Parents())
|
par, err := lts(left.Parents())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -498,7 +502,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti
|
|||||||
left = par
|
left = par
|
||||||
} else {
|
} else {
|
||||||
rightChain = append(rightChain, right)
|
rightChain = append(rightChain, right)
|
||||||
par, err := cs.LoadTipSet(right.Parents())
|
par, err := lts(right.Parents())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Infof("failed to fetch right.Parents: %s", err)
|
log.Infof("failed to fetch right.Parents: %s", err)
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
@ -509,6 +513,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti
|
|||||||
}
|
}
|
||||||
|
|
||||||
return leftChain, rightChain, nil
|
return leftChain, rightChain, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetHeaviestTipSet returns the current heaviest tipset known (i.e. our head).
|
// GetHeaviestTipSet returns the current heaviest tipset known (i.e. our head).
|
||||||
|
Loading…
Reference in New Issue
Block a user