Merge pull request #701 from filecoin-project/fix/miner-message-filtering

fix miner message filter nonce checking
This commit is contained in:
Łukasz Magiera 2019-12-03 22:36:06 +01:00 committed by GitHub
commit 3c271db802
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 199 additions and 86 deletions

View File

@ -62,7 +62,7 @@ func (tsc *tipSetCache) add(ts *types.TipSet) error {
func (tsc *tipSetCache) revert(ts *types.TipSet) error { func (tsc *tipSetCache) revert(ts *types.TipSet) error {
if tsc.len == 0 { if tsc.len == 0 {
return xerrors.New("tipSetCache.revert: nothing to revert; cache is empty") return nil // this can happen, and it's fine
} }
if !tsc.cache[tsc.start].Equals(ts) { if !tsc.cache[tsc.start].Equals(ts) {
@ -92,7 +92,8 @@ func (tsc *tipSetCache) getNonNull(height uint64) (*types.TipSet, error) {
func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) { func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
if tsc.len == 0 { if tsc.len == 0 {
return nil, xerrors.New("tipSetCache.get: cache is empty") log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
return tsc.storage(context.TODO(), height, nil)
} }
headH := tsc.cache[tsc.start].Height() headH := tsc.cache[tsc.start].Height()

View File

@ -261,13 +261,17 @@ func (cg *ChainGen) nextBlockProof(ctx context.Context, pts *types.TipSet, m add
VRFProof: vrfout, VRFProof: vrfout,
} }
win, eproof, err := IsRoundWinner(ctx, pts, round, m, cg.eppProvs[m], &mca{w: cg.w, sm: cg.sm}) eproofin, err := IsRoundWinner(ctx, pts, round, m, cg.eppProvs[m], &mca{w: cg.w, sm: cg.sm})
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("checking round winner failed: %w", err) return nil, nil, xerrors.Errorf("checking round winner failed: %w", err)
} }
if !win { if eproofin == nil {
return nil, tick, nil return nil, tick, nil
} }
eproof, err := ComputeProof(ctx, cg.eppProvs[m], eproofin)
if err != nil {
return nil, nil, xerrors.Errorf("computing proof: %w", err)
}
return eproof, tick, nil return eproof, tick, nil
} }
@ -466,28 +470,35 @@ func (epp *eppProvider) ComputeProof(ctx context.Context, _ sectorbuilder.Sorted
return []byte("valid proof"), nil return []byte("valid proof"), nil
} }
func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (bool, *types.EPostProof, error) { type ProofInput struct {
sectors sectorbuilder.SortedPublicSectorInfo
hvrf []byte
winners []sectorbuilder.EPostCandidate
vrfout []byte
}
func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (*ProofInput, error) {
r, err := a.ChainGetRandomness(ctx, ts.Key(), round-build.EcRandomnessLookback) r, err := a.ChainGetRandomness(ctx, ts.Key(), round-build.EcRandomnessLookback)
if err != nil { if err != nil {
return false, nil, xerrors.Errorf("chain get randomness: %w", err) return nil, xerrors.Errorf("chain get randomness: %w", err)
} }
mworker, err := a.StateMinerWorker(ctx, miner, ts) mworker, err := a.StateMinerWorker(ctx, miner, ts)
if err != nil { if err != nil {
return false, nil, xerrors.Errorf("failed to get miner worker: %w", err) return nil, xerrors.Errorf("failed to get miner worker: %w", err)
} }
vrfout, err := ComputeVRF(ctx, a.WalletSign, mworker, miner, DSepElectionPost, r) vrfout, err := ComputeVRF(ctx, a.WalletSign, mworker, miner, DSepElectionPost, r)
if err != nil { if err != nil {
return false, nil, xerrors.Errorf("failed to compute VRF: %w", err) return nil, xerrors.Errorf("failed to compute VRF: %w", err)
} }
pset, err := a.StateMinerProvingSet(ctx, miner, ts) pset, err := a.StateMinerProvingSet(ctx, miner, ts)
if err != nil { if err != nil {
return false, nil, xerrors.Errorf("failed to load proving set for miner: %w", err) return nil, xerrors.Errorf("failed to load proving set for miner: %w", err)
} }
if len(pset) == 0 { if len(pset) == 0 {
return false, nil, nil return nil, nil
} }
var sinfos []ffi.PublicSectorInfo var sinfos []ffi.PublicSectorInfo
@ -504,17 +515,17 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner add
hvrf := sha256.Sum256(vrfout) hvrf := sha256.Sum256(vrfout)
candidates, err := epp.GenerateCandidates(ctx, sectors, hvrf[:]) candidates, err := epp.GenerateCandidates(ctx, sectors, hvrf[:])
if err != nil { if err != nil {
return false, nil, xerrors.Errorf("failed to generate electionPoSt candidates: %w", err) return nil, xerrors.Errorf("failed to generate electionPoSt candidates: %w", err)
} }
pow, err := a.StateMinerPower(ctx, miner, ts) pow, err := a.StateMinerPower(ctx, miner, ts)
if err != nil { if err != nil {
return false, nil, xerrors.Errorf("failed to check power: %w", err) return nil, xerrors.Errorf("failed to check power: %w", err)
} }
ssize, err := a.StateMinerSectorSize(ctx, miner, ts) ssize, err := a.StateMinerSectorSize(ctx, miner, ts)
if err != nil { if err != nil {
return false, nil, xerrors.Errorf("failed to look up miners sector size: %w", err) return nil, xerrors.Errorf("failed to look up miners sector size: %w", err)
} }
var winners []sectorbuilder.EPostCandidate var winners []sectorbuilder.EPostCandidate
@ -526,19 +537,28 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner add
// no winners, sad // no winners, sad
if len(winners) == 0 { if len(winners) == 0 {
return false, nil, nil return nil, nil
} }
proof, err := epp.ComputeProof(ctx, sectors, hvrf[:], winners) return &ProofInput{
sectors: sectors,
hvrf: hvrf[:],
winners: winners,
vrfout: vrfout,
}, nil
}
func ComputeProof(ctx context.Context, epp ElectionPoStProver, pi *ProofInput) (*types.EPostProof, error) {
proof, err := epp.ComputeProof(ctx, pi.sectors, pi.hvrf, pi.winners)
if err != nil { if err != nil {
return false, nil, xerrors.Errorf("failed to compute snark for election proof: %w", err) return nil, xerrors.Errorf("failed to compute snark for election proof: %w", err)
} }
ept := types.EPostProof{ ept := types.EPostProof{
Proof: proof, Proof: proof,
PostRand: vrfout, PostRand: pi.vrfout,
} }
for _, win := range winners { for _, win := range pi.winners {
ept.Candidates = append(ept.Candidates, types.EPostTicket{ ept.Candidates = append(ept.Candidates, types.EPostTicket{
Partial: win.PartialTicket[:], Partial: win.PartialTicket[:],
SectorID: win.SectorID, SectorID: win.SectorID,
@ -546,7 +566,7 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner add
}) })
} }
return true, &ept, nil return &ept, nil
} }
type SignFunc func(context.Context, address.Address, []byte) (*types.Signature, error) type SignFunc func(context.Context, address.Address, []byte) (*types.Signature, error)

View File

@ -62,7 +62,7 @@ type MessagePool struct {
pending map[address.Address]*msgSet pending map[address.Address]*msgSet
pendingCount int pendingCount int
curTsLk sync.Mutex curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
curTs *types.TipSet curTs *types.TipSet
api Provider api Provider
@ -106,7 +106,7 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
} }
type Provider interface { type Provider interface {
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
PutMessage(m store.ChainMsg) (cid.Cid, error) PutMessage(m store.ChainMsg) (cid.Cid, error)
PubSubPublish(string, []byte) error PubSubPublish(string, []byte) error
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
@ -124,8 +124,9 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
return &mpoolProvider{sm, ps} return &mpoolProvider{sm, ps}
} }
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) { func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
mpp.sm.ChainStore().SubscribeHeadChanges(cb) mpp.sm.ChainStore().SubscribeHeadChanges(cb)
return mpp.sm.ChainStore().GetHeaviestTipSet()
} }
func (mpp *mpoolProvider) PutMessage(m store.ChainMsg) (cid.Cid, error) { func (mpp *mpoolProvider) PutMessage(m store.ChainMsg) (cid.Cid, error) {
@ -173,7 +174,7 @@ func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) {
go mp.repubLocal() go mp.repubLocal()
api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error { mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
err := mp.HeadChange(rev, app) err := mp.HeadChange(rev, app)
if err != nil { if err != nil {
log.Errorf("mpool head notif handler error: %+v", err) log.Errorf("mpool head notif handler error: %+v", err)
@ -257,6 +258,12 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error {
} }
func (mp *MessagePool) Add(m *types.SignedMessage) error { func (mp *MessagePool) Add(m *types.SignedMessage) error {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
return mp.addTs(m, mp.curTs)
}
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error {
// big messages are bad, anti DOS // big messages are bad, anti DOS
if m.Size() > 32*1024 { if m.Size() > 32*1024 {
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig) return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
@ -275,7 +282,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
return err return err
} }
snonce, err := mp.getStateNonce(m.Message.From) snonce, err := mp.getStateNonce(m.Message.From, curTs)
if err != nil { if err != nil {
return xerrors.Errorf("failed to look up actor state nonce: %w", err) return xerrors.Errorf("failed to look up actor state nonce: %w", err)
} }
@ -333,14 +340,17 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
} }
func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) { func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.lk.Lock() mp.lk.Lock()
defer mp.lk.Unlock() defer mp.lk.Unlock()
return mp.getNonceLocked(addr) return mp.getNonceLocked(addr, mp.curTs)
} }
func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { func (mp *MessagePool) getNonceLocked(addr address.Address, curTs *types.TipSet) (uint64, error) {
stateNonce, err := mp.getStateNonce(addr) // sanity check stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -359,22 +369,9 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
return stateNonce, nil return stateNonce, nil
} }
func (mp *MessagePool) setCurTipset(ts *types.TipSet) { func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTs = ts
}
func (mp *MessagePool) getCurTipset() *types.TipSet {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
return mp.curTs
}
func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
// TODO: this method probably should be cached // TODO: this method probably should be cached
curTs := mp.getCurTipset()
act, err := mp.api.StateGetActor(addr, curTs) act, err := mp.api.StateGetActor(addr, curTs)
if err != nil { if err != nil {
return 0, err return 0, err
@ -417,13 +414,16 @@ func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, erro
} }
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) { func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.lk.Lock() mp.lk.Lock()
defer mp.lk.Unlock() defer mp.lk.Unlock()
if addr.Protocol() == address.ID { if addr.Protocol() == address.ID {
log.Warnf("Called pushWithNonce with ID address (%s) this might not be handled properly yet", addr) log.Warnf("Called pushWithNonce with ID address (%s) this might not be handled properly yet", addr)
} }
nonce, err := mp.getNonceLocked(addr) nonce, err := mp.getNonceLocked(addr, mp.curTs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -485,15 +485,19 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
} }
} }
func (mp *MessagePool) Pending() []*types.SignedMessage { func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.lk.Lock() mp.lk.Lock()
defer mp.lk.Unlock() defer mp.lk.Unlock()
out := make([]*types.SignedMessage, 0) out := make([]*types.SignedMessage, 0)
for a := range mp.pending { for a := range mp.pending {
out = append(out, mp.pendingFor(a)...) out = append(out, mp.pendingFor(a)...)
} }
return out return out, mp.curTs
} }
func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage { func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
@ -516,6 +520,8 @@ func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
} }
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
for _, ts := range revert { for _, ts := range revert {
pts, err := mp.api.LoadTipSet(ts.Parents()) pts, err := mp.api.LoadTipSet(ts.Parents())
@ -523,27 +529,16 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
return err return err
} }
mp.setCurTipset(pts) msgs, err := mp.MessagesForBlocks(ts.Blocks())
for _, b := range ts.Blocks() { if err != nil {
bmsgs, smsgs, err := mp.api.MessagesForBlock(b) return err
if err != nil { }
return xerrors.Errorf("failed to get messages for revert block %s(height %d): %w", b.Cid(), b.Height, err)
}
for _, msg := range smsgs {
if err := mp.Add(msg); err != nil {
log.Error(err) // TODO: probably lots of spam in multi-block tsets
}
}
for _, msg := range bmsgs { mp.curTs = pts
smsg := mp.RecoverSig(msg)
if smsg != nil { for _, msg := range msgs {
if err := mp.Add(smsg); err != nil { if err := mp.addTs(msg, pts); err != nil {
log.Error(err) // TODO: probably lots of spam in multi-block tsets log.Error(err) // TODO: probably lots of spam in multi-block tsets
}
} else {
log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid())
}
} }
} }
} }
@ -562,12 +557,38 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
mp.Remove(msg.From, msg.Nonce) mp.Remove(msg.From, msg.Nonce)
} }
} }
mp.setCurTipset(ts)
mp.curTs = ts
} }
return nil return nil
} }
func (mp *MessagePool) MessagesForBlocks(blks []*types.BlockHeader) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, 0)
for _, b := range blks {
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
if err != nil {
return nil, xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
}
for _, msg := range smsgs {
out = append(out, msg)
}
for _, msg := range bmsgs {
smsg := mp.RecoverSig(msg)
if smsg != nil {
out = append(out, smsg)
} else {
log.Warnf("could not recover signature for bls message %s", msg.Cid())
}
}
}
return out, nil
}
func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage { func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
val, ok := mp.blsSigCache.Get(msg.Cid()) val, ok := mp.blsSigCache.Get(msg.Cid())
if !ok { if !ok {

View File

@ -52,8 +52,9 @@ func (tma *testMpoolApi) setBlockMessages(h *types.BlockHeader, msgs ...*types.S
tma.tipsets = append(tma.tipsets, mock.TipSet(h)) tma.tipsets = append(tma.tipsets, mock.TipSet(h))
} }
func (tma *testMpoolApi) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) { func (tma *testMpoolApi) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
tma.cb = cb tma.cb = cb
return nil
} }
func (tma *testMpoolApi) PutMessage(m store.ChainMsg) (cid.Cid, error) { func (tma *testMpoolApi) PutMessage(m store.ChainMsg) (cid.Cid, error) {
@ -216,7 +217,8 @@ func TestRevertMessages(t *testing.T) {
assertNonce(t, mp, sender, 4) assertNonce(t, mp, sender, 4)
if len(mp.Pending()) != 3 { p, _ := mp.Pending()
if len(p) != 3 {
t.Fatal("expected three messages in mempool") t.Fatal("expected three messages in mempool")
} }

View File

@ -195,6 +195,7 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.Bl
} }
if applied[m.From] != m.Nonce { if applied[m.From] != m.Nonce {
log.Infof("skipping message from %s: nonce check failed: exp %d, was %d", m.From, applied[m.From], m.Nonce)
continue continue
} }
applied[m.From]++ applied[m.From]++

View File

@ -264,17 +264,28 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB
return nil, xerrors.Errorf("scratching ticket failed: %w", err) return nil, xerrors.Errorf("scratching ticket failed: %w", err)
} }
win, proof, err := gen.IsRoundWinner(ctx, base.ts, int64(base.ts.Height()+base.nullRounds+1), addr, m.epp, m.api) proofin, err := gen.IsRoundWinner(ctx, base.ts, int64(base.ts.Height()+base.nullRounds+1), addr, m.epp, m.api)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to check if we win next round: %w", err) return nil, xerrors.Errorf("failed to check if we win next round: %w", err)
} }
if !win { if proofin == nil {
base.nullRounds++ base.nullRounds++
return nil, nil return nil, nil
} }
b, err := m.createBlock(base, addr, ticket, proof) // get pending messages early,
pending, err := m.api.MpoolPending(context.TODO(), base.ts)
if err != nil {
return nil, xerrors.Errorf("failed to get pending messages: %w", err)
}
proof, err := gen.ComputeProof(ctx, m.epp, proofin)
if err != nil {
return nil, xerrors.Errorf("computing election proof: %w", err)
}
b, err := m.createBlock(base, addr, ticket, proof, pending)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to create block: %w", err) return nil, xerrors.Errorf("failed to create block: %w", err)
} }
@ -334,13 +345,7 @@ func (m *Miner) computeTicket(ctx context.Context, addr address.Address, base *M
}, nil }, nil
} }
func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof *types.EPostProof) (*types.BlockMsg, error) { func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof *types.EPostProof, pending []*types.SignedMessage) (*types.BlockMsg, error) {
pending, err := m.api.MpoolPending(context.TODO(), base.ts)
if err != nil {
return nil, xerrors.Errorf("failed to get pending messages: %w", err)
}
msgs, err := selectMessages(context.TODO(), m.api.StateGetActor, base, pending) msgs, err := selectMessages(context.TODO(), m.api.StateGetActor, base, pending)
if err != nil { if err != nil {
return nil, xerrors.Errorf("message filtering failed: %w", err) return nil, xerrors.Errorf("message filtering failed: %w", err)
@ -356,10 +361,20 @@ func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *type
type actorLookup func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) type actorLookup func(context.Context, address.Address, *types.TipSet) (*types.Actor, error)
func countFrom(msgs []*types.SignedMessage, from address.Address) (out int) {
for _, msg := range msgs {
if msg.Message.From == from {
out++
}
}
return out
}
func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs []*types.SignedMessage) ([]*types.SignedMessage, error) { func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs []*types.SignedMessage) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, 0, len(msgs)) out := make([]*types.SignedMessage, 0, len(msgs))
inclNonces := make(map[address.Address]uint64) inclNonces := make(map[address.Address]uint64)
inclBalances := make(map[address.Address]types.BigInt) inclBalances := make(map[address.Address]types.BigInt)
for _, msg := range msgs { for _, msg := range msgs {
if msg.Message.To == address.Undef { if msg.Message.To == address.Undef {
log.Warnf("message in mempool had bad 'To' address") log.Warnf("message in mempool had bad 'To' address")
@ -367,12 +382,13 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs
} }
from := msg.Message.From from := msg.Message.From
act, err := al(ctx, from, base.ts)
if err != nil {
return nil, xerrors.Errorf("failed to check message sender balance: %w", err)
}
if _, ok := inclNonces[from]; !ok { if _, ok := inclNonces[from]; !ok {
act, err := al(ctx, from, base.ts)
if err != nil {
return nil, xerrors.Errorf("failed to check message sender balance: %w", err)
}
inclNonces[from] = act.Nonce inclNonces[from] = act.Nonce
inclBalances[from] = act.Balance inclBalances[from] = act.Balance
} }
@ -383,12 +399,12 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs
} }
if msg.Message.Nonce > inclNonces[from] { if msg.Message.Nonce > inclNonces[from] {
log.Warnf("message in mempool has too high of a nonce (%d > %d) %s", msg.Message.Nonce, inclNonces[from], msg.Cid()) log.Warnf("message in mempool has too high of a nonce (%d > %d) %s (%d pending for orig)", msg.Message.Nonce, inclNonces[from], msg.Cid(), countFrom(msgs, from))
continue continue
} }
if msg.Message.Nonce < inclNonces[from] { if msg.Message.Nonce < inclNonces[from] {
log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid()) log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s (%d pending for)", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid(), countFrom(msgs, from))
continue continue
} }

View File

@ -3,12 +3,14 @@ package full
import ( import (
"context" "context"
"github.com/ipfs/go-cid"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
@ -17,13 +19,63 @@ type MpoolAPI struct {
WalletAPI WalletAPI
Chain *store.ChainStore
Mpool *messagepool.MessagePool Mpool *messagepool.MessagePool
} }
func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
// TODO: need to make sure we don't return messages that were already included in the referenced chain pending, mpts := a.Mpool.Pending()
// also need to accept ts == nil just fine, assume nil == chain.Head()
return a.Mpool.Pending(), nil haveCids := map[cid.Cid]struct{}{}
for _, m := range pending {
haveCids[m.Cid()] = struct{}{}
}
if ts == nil || mpts.Height() > ts.Height() {
return pending, nil
}
for {
if mpts.Height() == ts.Height() {
if mpts.Equals(ts) {
return pending, nil
}
// different blocks in tipsets
have, err := a.Mpool.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf("getting messages for base ts: %w", err)
}
for _, m := range have {
haveCids[m.Cid()] = struct{}{}
}
}
msgs, err := a.Mpool.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}
for _, m := range msgs {
if _, ok := haveCids[m.Cid()]; ok {
continue
}
haveCids[m.Cid()] = struct{}{}
pending = append(pending, m)
}
if mpts.Height() >= ts.Height() {
return pending, nil
}
ts, err = a.Chain.LoadTipSet(ts.Parents())
if err != nil {
return nil, xerrors.Errorf("loading parent tipset: %w", err)
}
}
} }
func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error { func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error {