miner: optimize SelectMessages
This commit is contained in:
parent
f86a2ced06
commit
b90666b293
@ -166,7 +166,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// sync with fake chainstore (for tests)
|
// sync with fake chainstore (for tests)
|
||||||
if fcs, ok := e.api.(interface{notifDone()}); ok {
|
if fcs, ok := e.api.(interface{ notifDone() }); ok {
|
||||||
fcs.notifDone()
|
fcs.notifDone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -671,7 +671,7 @@ func TestCalled(t *testing.T) {
|
|||||||
0: n2msg,
|
0: n2msg,
|
||||||
})
|
})
|
||||||
|
|
||||||
require.Equal(t, true, applied) // msg from H=7, which had reverted execution
|
require.Equal(t, true, applied) // msg from H=7, which had reverted execution
|
||||||
require.Equal(t, false, reverted)
|
require.Equal(t, false, reverted)
|
||||||
require.Equal(t, abi.ChainEpoch(10), appliedH)
|
require.Equal(t, abi.ChainEpoch(10), appliedH)
|
||||||
applied = false
|
applied = false
|
||||||
|
@ -262,6 +262,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tMBI := time.Now()
|
||||||
|
|
||||||
beaconPrev := mbi.PrevBeaconEntry
|
beaconPrev := mbi.PrevBeaconEntry
|
||||||
|
|
||||||
bvals, err := beacon.BeaconEntriesForBlock(ctx, m.beacon, round, beaconPrev)
|
bvals, err := beacon.BeaconEntriesForBlock(ctx, m.beacon, round, beaconPrev)
|
||||||
@ -269,6 +271,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
|
|||||||
return nil, xerrors.Errorf("get beacon entries failed: %w", err)
|
return nil, xerrors.Errorf("get beacon entries failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tDrand := time.Now()
|
||||||
|
|
||||||
hasPower, err := m.hasPower(ctx, m.address, base.TipSet)
|
hasPower, err := m.hasPower(ctx, m.address, base.TipSet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("checking if miner is slashed: %w", err)
|
return nil, xerrors.Errorf("checking if miner is slashed: %w", err)
|
||||||
@ -279,6 +283,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tPowercheck := time.Now()
|
||||||
|
|
||||||
log.Infof("Time delta between now and our mining base: %ds (nulls: %d)", uint64(time.Now().Unix())-base.TipSet.MinTimestamp(), base.NullRounds)
|
log.Infof("Time delta between now and our mining base: %ds (nulls: %d)", uint64(time.Now().Unix())-base.TipSet.MinTimestamp(), base.NullRounds)
|
||||||
|
|
||||||
rbase := beaconPrev
|
rbase := beaconPrev
|
||||||
@ -301,6 +307,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tTicket := time.Now()
|
||||||
|
|
||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
if err := m.address.MarshalCBOR(buf); err != nil {
|
if err := m.address.MarshalCBOR(buf); err != nil {
|
||||||
return nil, xerrors.Errorf("failed to marshal miner address: %w", err)
|
return nil, xerrors.Errorf("failed to marshal miner address: %w", err)
|
||||||
@ -313,6 +321,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
|
|||||||
|
|
||||||
prand := abi.PoStRandomness(rand)
|
prand := abi.PoStRandomness(rand)
|
||||||
|
|
||||||
|
tSeed := time.Now()
|
||||||
|
|
||||||
postProof, err := m.epp.ComputeProof(ctx, mbi.Sectors, prand)
|
postProof, err := m.epp.ComputeProof(ctx, mbi.Sectors, prand)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to compute winning post proof: %w", err)
|
return nil, xerrors.Errorf("failed to compute winning post proof: %w", err)
|
||||||
@ -324,16 +334,27 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
|
|||||||
return nil, xerrors.Errorf("failed to get pending messages: %w", err)
|
return nil, xerrors.Errorf("failed to get pending messages: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tPending := time.Now()
|
||||||
|
|
||||||
// TODO: winning post proof
|
// TODO: winning post proof
|
||||||
b, err := m.createBlock(base, m.address, ticket, winner, bvals, postProof, pending)
|
b, err := m.createBlock(base, m.address, ticket, winner, bvals, postProof, pending)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to create block: %w", err)
|
return nil, xerrors.Errorf("failed to create block: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
dur := time.Since(start)
|
tCreateBlock := time.Now()
|
||||||
|
dur := tCreateBlock.Sub(start)
|
||||||
log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height, "took", dur)
|
log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height, "took", dur)
|
||||||
if dur > time.Second*build.BlockDelay {
|
if dur > time.Second*build.BlockDelay {
|
||||||
log.Warn("CAUTION: block production took longer than the block delay. Your computer may not be fast enough to keep up")
|
log.Warn("CAUTION: block production took longer than the block delay. Your computer may not be fast enough to keep up")
|
||||||
|
|
||||||
|
log.Warnw("tMinerBaseInfo ", "duration", tMBI.Sub(start))
|
||||||
|
log.Warnw("tDrand ", "duration", tDrand.Sub(tMBI))
|
||||||
|
log.Warnw("tPowercheck ", "duration", tPowercheck.Sub(tDrand))
|
||||||
|
log.Warnw("tTicket ", "duration", tTicket.Sub(tPowercheck))
|
||||||
|
log.Warnw("tSeed ", "duration", tSeed.Sub(tTicket))
|
||||||
|
log.Warnw("tPending ", "duration", tPending.Sub(tSeed))
|
||||||
|
log.Warnw("tCreateBlock ", "duration", tCreateBlock.Sub(tPending))
|
||||||
}
|
}
|
||||||
|
|
||||||
return b, nil
|
return b, nil
|
||||||
@ -403,6 +424,34 @@ func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *type
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type actCacheEntry struct {
|
||||||
|
act *types.Actor
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type cachedActorLookup struct {
|
||||||
|
tsk types.TipSetKey
|
||||||
|
cache map[address.Address]actCacheEntry
|
||||||
|
fallback ActorLookup
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cachedActorLookup) StateGetActor(ctx context.Context, a address.Address, tsk types.TipSetKey) (*types.Actor, error) {
|
||||||
|
if c.tsk == tsk {
|
||||||
|
e, has := c.cache[a]
|
||||||
|
if has {
|
||||||
|
return e.act, e.err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
e, err := c.fallback(ctx, a, tsk)
|
||||||
|
if c.tsk == tsk {
|
||||||
|
c.cache[a] = actCacheEntry{
|
||||||
|
act: e, err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return e, err
|
||||||
|
}
|
||||||
|
|
||||||
type ActorLookup func(context.Context, address.Address, types.TipSetKey) (*types.Actor, error)
|
type ActorLookup func(context.Context, address.Address, types.TipSetKey) (*types.Actor, error)
|
||||||
|
|
||||||
func countFrom(msgs []*types.SignedMessage, from address.Address) (out int) {
|
func countFrom(msgs []*types.SignedMessage, from address.Address) (out int) {
|
||||||
@ -415,12 +464,26 @@ func countFrom(msgs []*types.SignedMessage, from address.Address) (out int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs []*types.SignedMessage) ([]*types.SignedMessage, error) {
|
func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs []*types.SignedMessage) ([]*types.SignedMessage, error) {
|
||||||
|
al = (&cachedActorLookup{
|
||||||
|
tsk: ts.Key(),
|
||||||
|
cache: map[address.Address]actCacheEntry{},
|
||||||
|
fallback: al,
|
||||||
|
}).StateGetActor
|
||||||
|
|
||||||
out := make([]*types.SignedMessage, 0, build.BlockMessageLimit)
|
out := make([]*types.SignedMessage, 0, build.BlockMessageLimit)
|
||||||
inclNonces := make(map[address.Address]uint64)
|
inclNonces := make(map[address.Address]uint64)
|
||||||
inclBalances := make(map[address.Address]types.BigInt)
|
inclBalances := make(map[address.Address]types.BigInt)
|
||||||
inclCount := make(map[address.Address]int)
|
inclCount := make(map[address.Address]int)
|
||||||
|
|
||||||
|
tooLowFundMsgs := 0
|
||||||
|
tooHighNonceMsgs := 0
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
vmValid := time.Duration(0)
|
||||||
|
getbal := time.Duration(0)
|
||||||
|
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
|
vmstart := time.Now()
|
||||||
|
|
||||||
minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that
|
minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that
|
||||||
if err := msg.VMMessage().ValidForBlockInclusion(minGas); err != nil {
|
if err := msg.VMMessage().ValidForBlockInclusion(minGas); err != nil {
|
||||||
@ -428,6 +491,8 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vmValid += time.Since(vmstart)
|
||||||
|
|
||||||
// TODO: this should be in some more general 'validate message' call
|
// TODO: this should be in some more general 'validate message' call
|
||||||
if msg.Message.GasLimit > build.BlockGasLimit {
|
if msg.Message.GasLimit > build.BlockGasLimit {
|
||||||
log.Warnf("message in mempool had too high of a gas limit (%d)", msg.Message.GasLimit)
|
log.Warnf("message in mempool had too high of a gas limit (%d)", msg.Message.GasLimit)
|
||||||
@ -441,6 +506,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
|
|||||||
|
|
||||||
from := msg.Message.From
|
from := msg.Message.From
|
||||||
|
|
||||||
|
getBalStart := time.Now()
|
||||||
if _, ok := inclNonces[from]; !ok {
|
if _, ok := inclNonces[from]; !ok {
|
||||||
act, err := al(ctx, from, ts.Key())
|
act, err := al(ctx, from, ts.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -451,14 +517,16 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
|
|||||||
inclNonces[from] = act.Nonce
|
inclNonces[from] = act.Nonce
|
||||||
inclBalances[from] = act.Balance
|
inclBalances[from] = act.Balance
|
||||||
}
|
}
|
||||||
|
getbal += time.Since(getBalStart)
|
||||||
|
|
||||||
if inclBalances[from].LessThan(msg.Message.RequiredFunds()) {
|
if inclBalances[from].LessThan(msg.Message.RequiredFunds()) {
|
||||||
log.Warnf("message in mempool does not have enough funds: %s", msg.Cid())
|
tooLowFundMsgs++
|
||||||
|
// todo: drop from mpool
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if msg.Message.Nonce > inclNonces[from] {
|
if msg.Message.Nonce > inclNonces[from] {
|
||||||
log.Debugf("message in mempool has too high of a nonce (%d > %d, from %s, inclcount %d) %s (%d pending for orig)", msg.Message.Nonce, inclNonces[from], from, inclCount[from], msg.Cid(), countFrom(msgs, from))
|
tooHighNonceMsgs++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -476,5 +544,23 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tooLowFundMsgs > 0 {
|
||||||
|
log.Warnf("%d messages in mempool does not have enough funds", tooLowFundMsgs)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tooHighNonceMsgs > 0 {
|
||||||
|
log.Warnf("%d messages in mempool had too high nonce", tooLowFundMsgs)
|
||||||
|
}
|
||||||
|
|
||||||
|
sm := time.Now()
|
||||||
|
if sm.Sub(start) > time.Second {
|
||||||
|
log.Warnw("SelectMessages took a long time",
|
||||||
|
"duration", sm.Sub(start),
|
||||||
|
"vmvalidate", vmValid,
|
||||||
|
"getbalance", getbal,
|
||||||
|
"msgs", len(msgs))
|
||||||
|
}
|
||||||
|
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user