Merge pull request #1752 from filecoin-project/fix/mpool-slowness

miner: Optimize SelectMessages
This commit is contained in:
Whyrusleeping 2020-05-15 10:48:06 -07:00 committed by GitHub
commit d6bb0500df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 91 additions and 5 deletions

View File

@ -166,7 +166,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
}
// sync with fake chainstore (for tests)
if fcs, ok := e.api.(interface{notifDone()}); ok {
if fcs, ok := e.api.(interface{ notifDone() }); ok {
fcs.notifDone()
}
}

View File

@ -671,7 +671,7 @@ func TestCalled(t *testing.T) {
0: n2msg,
})
require.Equal(t, true, applied) // msg from H=7, which had reverted execution
require.Equal(t, true, applied) // msg from H=7, which had reverted execution
require.Equal(t, false, reverted)
require.Equal(t, abi.ChainEpoch(10), appliedH)
applied = false

View File

@ -262,6 +262,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, nil
}
tMBI := time.Now()
beaconPrev := mbi.PrevBeaconEntry
bvals, err := beacon.BeaconEntriesForBlock(ctx, m.beacon, round, beaconPrev)
@ -269,6 +271,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, xerrors.Errorf("get beacon entries failed: %w", err)
}
tDrand := time.Now()
hasPower, err := m.hasPower(ctx, m.address, base.TipSet)
if err != nil {
return nil, xerrors.Errorf("checking if miner is slashed: %w", err)
@ -279,6 +283,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, nil
}
tPowercheck := time.Now()
log.Infof("Time delta between now and our mining base: %ds (nulls: %d)", uint64(time.Now().Unix())-base.TipSet.MinTimestamp(), base.NullRounds)
rbase := beaconPrev
@ -301,6 +307,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, nil
}
tTicket := time.Now()
buf := new(bytes.Buffer)
if err := m.address.MarshalCBOR(buf); err != nil {
return nil, xerrors.Errorf("failed to marshal miner address: %w", err)
@ -313,6 +321,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
prand := abi.PoStRandomness(rand)
tSeed := time.Now()
postProof, err := m.epp.ComputeProof(ctx, mbi.Sectors, prand)
if err != nil {
return nil, xerrors.Errorf("failed to compute winning post proof: %w", err)
@ -324,16 +334,27 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, xerrors.Errorf("failed to get pending messages: %w", err)
}
tPending := time.Now()
// TODO: winning post proof
b, err := m.createBlock(base, m.address, ticket, winner, bvals, postProof, pending)
if err != nil {
return nil, xerrors.Errorf("failed to create block: %w", err)
}
dur := time.Since(start)
tCreateBlock := time.Now()
dur := tCreateBlock.Sub(start)
log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height, "took", dur)
if dur > time.Second*build.BlockDelay {
log.Warn("CAUTION: block production took longer than the block delay. Your computer may not be fast enough to keep up")
log.Warnw("tMinerBaseInfo ", "duration", tMBI.Sub(start))
log.Warnw("tDrand ", "duration", tDrand.Sub(tMBI))
log.Warnw("tPowercheck ", "duration", tPowercheck.Sub(tDrand))
log.Warnw("tTicket ", "duration", tTicket.Sub(tPowercheck))
log.Warnw("tSeed ", "duration", tSeed.Sub(tTicket))
log.Warnw("tPending ", "duration", tPending.Sub(tSeed))
log.Warnw("tCreateBlock ", "duration", tCreateBlock.Sub(tPending))
}
return b, nil
@ -403,6 +424,34 @@ func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *type
})
}
type actCacheEntry struct {
act *types.Actor
err error
}
type cachedActorLookup struct {
tsk types.TipSetKey
cache map[address.Address]actCacheEntry
fallback ActorLookup
}
func (c *cachedActorLookup) StateGetActor(ctx context.Context, a address.Address, tsk types.TipSetKey) (*types.Actor, error) {
if c.tsk == tsk {
e, has := c.cache[a]
if has {
return e.act, e.err
}
}
e, err := c.fallback(ctx, a, tsk)
if c.tsk == tsk {
c.cache[a] = actCacheEntry{
act: e, err: err,
}
}
return e, err
}
type ActorLookup func(context.Context, address.Address, types.TipSetKey) (*types.Actor, error)
func countFrom(msgs []*types.SignedMessage, from address.Address) (out int) {
@ -415,12 +464,26 @@ func countFrom(msgs []*types.SignedMessage, from address.Address) (out int) {
}
func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs []*types.SignedMessage) ([]*types.SignedMessage, error) {
al = (&cachedActorLookup{
tsk: ts.Key(),
cache: map[address.Address]actCacheEntry{},
fallback: al,
}).StateGetActor
out := make([]*types.SignedMessage, 0, build.BlockMessageLimit)
inclNonces := make(map[address.Address]uint64)
inclBalances := make(map[address.Address]types.BigInt)
inclCount := make(map[address.Address]int)
tooLowFundMsgs := 0
tooHighNonceMsgs := 0
start := time.Now()
vmValid := time.Duration(0)
getbal := time.Duration(0)
for _, msg := range msgs {
vmstart := time.Now()
minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that
if err := msg.VMMessage().ValidForBlockInclusion(minGas); err != nil {
@ -428,6 +491,8 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
continue
}
vmValid += time.Since(vmstart)
// TODO: this should be in some more general 'validate message' call
if msg.Message.GasLimit > build.BlockGasLimit {
log.Warnf("message in mempool had too high of a gas limit (%d)", msg.Message.GasLimit)
@ -441,6 +506,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
from := msg.Message.From
getBalStart := time.Now()
if _, ok := inclNonces[from]; !ok {
act, err := al(ctx, from, ts.Key())
if err != nil {
@ -451,14 +517,16 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
inclNonces[from] = act.Nonce
inclBalances[from] = act.Balance
}
getbal += time.Since(getBalStart)
if inclBalances[from].LessThan(msg.Message.RequiredFunds()) {
log.Warnf("message in mempool does not have enough funds: %s", msg.Cid())
tooLowFundMsgs++
// todo: drop from mpool
continue
}
if msg.Message.Nonce > inclNonces[from] {
log.Debugf("message in mempool has too high of a nonce (%d > %d, from %s, inclcount %d) %s (%d pending for orig)", msg.Message.Nonce, inclNonces[from], from, inclCount[from], msg.Cid(), countFrom(msgs, from))
tooHighNonceMsgs++
continue
}
@ -476,5 +544,23 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
break
}
}
if tooLowFundMsgs > 0 {
log.Warnf("%d messages in mempool does not have enough funds", tooLowFundMsgs)
}
if tooHighNonceMsgs > 0 {
log.Warnf("%d messages in mempool had too high nonce", tooLowFundMsgs)
}
sm := time.Now()
if sm.Sub(start) > time.Second {
log.Warnw("SelectMessages took a long time",
"duration", sm.Sub(start),
"vmvalidate", vmValid,
"getbalance", getbal,
"msgs", len(msgs))
}
return out, nil
}