Mempool: Selection should respect CBOR limits
This commit is contained in:
		
							parent
							
								
									518634b998
								
							
						
					
					
						commit
						3e872599ce
					
				| @ -121,7 +121,7 @@ loop: | ||||
| 
 | ||||
| 		// we can't fit the current chain but there is gas to spare
 | ||||
| 		// trim it and push it down
 | ||||
| 		chain.Trim(gasLimit, mp, baseFee) | ||||
| 		chain.Trim(gasLimit, repubMsgLimit, mp, baseFee) | ||||
| 		for j := i; j < len(chains)-1; j++ { | ||||
| 			if chains[j].Before(chains[j+1]) { | ||||
| 				break | ||||
| @ -131,6 +131,10 @@ loop: | ||||
| 	} | ||||
| 
 | ||||
| 	count := 0 | ||||
| 	if len(msgs) > repubMsgLimit { | ||||
| 		msgs = msgs[:repubMsgLimit] | ||||
| 	} | ||||
| 
 | ||||
| 	log.Infof("republishing %d messages", len(msgs)) | ||||
| 	for _, m := range msgs { | ||||
| 		mb, err := m.Serialize() | ||||
|  | ||||
| @ -7,6 +7,10 @@ import ( | ||||
| 	"sort" | ||||
| 	"time" | ||||
| 
 | ||||
| 	cbg "github.com/whyrusleeping/cbor-gen" | ||||
| 
 | ||||
| 	"github.com/filecoin-project/go-state-types/crypto" | ||||
| 
 | ||||
| 	"golang.org/x/xerrors" | ||||
| 
 | ||||
| 	"github.com/filecoin-project/go-address" | ||||
| @ -34,9 +38,10 @@ type msgChain struct { | ||||
| 	merged       bool | ||||
| 	next         *msgChain | ||||
| 	prev         *msgChain | ||||
| 	sigType      crypto.SigType | ||||
| } | ||||
| 
 | ||||
| func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) { | ||||
| func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { | ||||
| 	mp.curTsLk.Lock() | ||||
| 	defer mp.curTsLk.Unlock() | ||||
| 
 | ||||
| @ -46,24 +51,151 @@ func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq | ||||
| 	// if the ticket quality is high enough that the first block has higher probability
 | ||||
| 	// than any other block, then we don't bother with optimal selection because the
 | ||||
| 	// first block will always have higher effective performance
 | ||||
| 	var sm *selectedMessages | ||||
| 	var err error | ||||
| 	if tq > 0.84 { | ||||
| 		msgs, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts) | ||||
| 		sm, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts) | ||||
| 	} else { | ||||
| 		msgs, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq) | ||||
| 		sm, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq) | ||||
| 	} | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if len(msgs) > build.BlockMessageLimit { | ||||
| 		msgs = msgs[:build.BlockMessageLimit] | ||||
| 	// one last sanity check
 | ||||
| 	if len(sm.msgs) > build.BlockMessageLimit { | ||||
| 		sm.msgs = sm.msgs[:build.BlockMessageLimit] | ||||
| 	} | ||||
| 
 | ||||
| 	return msgs, nil | ||||
| 	return sm.msgs, nil | ||||
| } | ||||
| 
 | ||||
| func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { | ||||
| type selectedMessages struct { | ||||
| 	msgs      []*types.SignedMessage | ||||
| 	gasLimit  int64 | ||||
| 	secpLimit int | ||||
| 	blsLimit  int | ||||
| } | ||||
| 
 | ||||
| // returns false if chain can't be added due to block constraints
 | ||||
| func (sm *selectedMessages) tryToAdd(mc *msgChain) bool { | ||||
| 	l := len(mc.msgs) | ||||
| 
 | ||||
| 	if build.BlockMessageLimit < l+len(sm.msgs) || sm.gasLimit < mc.gasLimit { | ||||
| 		return false | ||||
| 	} | ||||
| 
 | ||||
| 	if mc.sigType == crypto.SigTypeBLS { | ||||
| 		if sm.blsLimit < l { | ||||
| 			return false | ||||
| 		} | ||||
| 
 | ||||
| 		sm.msgs = append(sm.msgs, mc.msgs...) | ||||
| 		sm.blsLimit -= l | ||||
| 		sm.gasLimit -= mc.gasLimit | ||||
| 	} else if mc.sigType == crypto.SigTypeSecp256k1 { | ||||
| 		if sm.secpLimit < l { | ||||
| 			return false | ||||
| 		} | ||||
| 
 | ||||
| 		sm.msgs = append(sm.msgs, mc.msgs...) | ||||
| 		sm.secpLimit -= l | ||||
| 		sm.gasLimit -= mc.gasLimit | ||||
| 	} | ||||
| 
 | ||||
| 	// don't add the weird sigType msg, but otherwise proceed
 | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| // returns false if messages can't be added due to block constraints
 | ||||
| // will trim / invalidate chain as appropriate
 | ||||
| func (sm *selectedMessages) tryToAddWithDeps(mc *msgChain, mp *MessagePool, baseFee types.BigInt) bool { | ||||
| 	// compute the dependencies that must be merged and the gas limit including deps
 | ||||
| 	chainGasLimit := mc.gasLimit | ||||
| 	chainMsgLimit := len(mc.msgs) | ||||
| 	depGasLimit := int64(0) | ||||
| 	depMsgLimit := 0 | ||||
| 	smMsgLimit := 0 | ||||
| 
 | ||||
| 	if mc.sigType == crypto.SigTypeBLS { | ||||
| 		smMsgLimit = sm.blsLimit | ||||
| 	} else if mc.sigType == crypto.SigTypeSecp256k1 { | ||||
| 		smMsgLimit = sm.secpLimit | ||||
| 	} else { | ||||
| 		return false | ||||
| 	} | ||||
| 
 | ||||
| 	if smMsgLimit > build.BlockMessageLimit-len(sm.msgs) { | ||||
| 		smMsgLimit = build.BlockMessageLimit - len(sm.msgs) | ||||
| 	} | ||||
| 
 | ||||
| 	var chainDeps []*msgChain | ||||
| 	for curChain := mc.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { | ||||
| 		chainDeps = append(chainDeps, curChain) | ||||
| 		chainGasLimit += curChain.gasLimit | ||||
| 		chainMsgLimit += len(curChain.msgs) | ||||
| 		depGasLimit += curChain.gasLimit | ||||
| 		depMsgLimit += len(curChain.msgs) | ||||
| 	} | ||||
| 
 | ||||
| 	// the chain doesn't fit as-is, so trim / invalidate it and return false
 | ||||
| 	if chainGasLimit > sm.gasLimit || chainMsgLimit > smMsgLimit { | ||||
| 
 | ||||
| 		// it doesn't all fit; now we have to take into account the dependent chains before
 | ||||
| 		// making a decision about trimming or invalidating.
 | ||||
| 		// if the dependencies exceed the block limits, then we must invalidate the chain
 | ||||
| 		// as it can never be included.
 | ||||
| 		// Otherwise we can just trim and continue
 | ||||
| 		if depGasLimit > sm.gasLimit || depMsgLimit >= smMsgLimit { | ||||
| 			mc.Invalidate() | ||||
| 		} else { | ||||
| 			// dependencies fit, just trim it
 | ||||
| 			mc.Trim(sm.gasLimit-depGasLimit, smMsgLimit-depMsgLimit, mp, baseFee) | ||||
| 		} | ||||
| 
 | ||||
| 		return false | ||||
| 	} | ||||
| 
 | ||||
| 	// the chain fits! include it together with all dependencies
 | ||||
| 	for i := len(chainDeps) - 1; i >= 0; i-- { | ||||
| 		curChain := chainDeps[i] | ||||
| 		curChain.merged = true | ||||
| 		sm.msgs = append(sm.msgs, curChain.msgs...) | ||||
| 	} | ||||
| 
 | ||||
| 	mc.merged = true | ||||
| 
 | ||||
| 	sm.msgs = append(sm.msgs, mc.msgs...) | ||||
| 	sm.gasLimit -= chainGasLimit | ||||
| 
 | ||||
| 	if mc.sigType == crypto.SigTypeBLS { | ||||
| 		sm.blsLimit -= chainMsgLimit | ||||
| 	} else if mc.sigType == crypto.SigTypeSecp256k1 { | ||||
| 		sm.secpLimit -= chainMsgLimit | ||||
| 	} | ||||
| 
 | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| func (sm *selectedMessages) trimChain(mc *msgChain, mp *MessagePool, baseFee types.BigInt) { | ||||
| 	msgLimit := build.BlockMessageLimit - len(sm.msgs) | ||||
| 	if mc.sigType == crypto.SigTypeBLS { | ||||
| 		if msgLimit > sm.blsLimit { | ||||
| 			msgLimit = sm.blsLimit | ||||
| 		} | ||||
| 	} else if mc.sigType == crypto.SigTypeSecp256k1 { | ||||
| 		if msgLimit > sm.secpLimit { | ||||
| 			msgLimit = sm.secpLimit | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if mc.gasLimit > sm.gasLimit || len(mc.msgs) > msgLimit { | ||||
| 		mc.Trim(sm.gasLimit, msgLimit, mp, baseFee) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) (*selectedMessages, error) { | ||||
| 	start := time.Now() | ||||
| 
 | ||||
| 	baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) | ||||
| @ -89,10 +221,10 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ | ||||
| 
 | ||||
| 	// 0b. Select all priority messages that fit in the block
 | ||||
| 	minGas := int64(gasguess.MinGas) | ||||
| 	result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts) | ||||
| 	result := mp.selectPriorityMessages(ctx, pending, baseFee, ts) | ||||
| 
 | ||||
| 	// have we filled the block?
 | ||||
| 	if gasLimit < minGas || len(result) >= build.BlockMessageLimit { | ||||
| 	if result.gasLimit < minGas || len(result.msgs) >= build.BlockMessageLimit { | ||||
| 		return result, nil | ||||
| 	} | ||||
| 
 | ||||
| @ -176,26 +308,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		// compute the dependencies that must be merged and the gas limit including deps
 | ||||
| 		chainGasLimit := chain.gasLimit | ||||
| 		chainMsgLimit := len(chain.msgs) | ||||
| 		var chainDeps []*msgChain | ||||
| 		for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { | ||||
| 			chainDeps = append(chainDeps, curChain) | ||||
| 			chainGasLimit += curChain.gasLimit | ||||
| 			chainMsgLimit += len(curChain.msgs) | ||||
| 		} | ||||
| 
 | ||||
| 		// does it all fit in the block?
 | ||||
| 		if chainGasLimit <= gasLimit && chainMsgLimit+len(result) <= build.BlockMessageLimit { | ||||
| 			// include it together with all dependencies
 | ||||
| 			for i := len(chainDeps) - 1; i >= 0; i-- { | ||||
| 				curChain := chainDeps[i] | ||||
| 				curChain.merged = true | ||||
| 				result = append(result, curChain.msgs...) | ||||
| 			} | ||||
| 
 | ||||
| 			chain.merged = true | ||||
| 		if result.tryToAddWithDeps(chain, mp, baseFee) { | ||||
| 			// adjust the effective performance for all subsequent chains
 | ||||
| 			if next := chain.next; next != nil && next.effPerf > 0 { | ||||
| 				next.effPerf += next.parentOffset | ||||
| @ -203,10 +316,8 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ | ||||
| 					next.setEffPerf() | ||||
| 				} | ||||
| 			} | ||||
| 			result = append(result, chain.msgs...) | ||||
| 			gasLimit -= chainGasLimit | ||||
| 
 | ||||
| 			// resort to account for already merged chains and effective performance adjustments
 | ||||
| 			// re-sort to account for already merged chains and effective performance adjustments
 | ||||
| 			// the sort *must* be stable or we end up getting negative gasPerfs pushed up.
 | ||||
| 			sort.SliceStable(chains[i+1:], func(i, j int) bool { | ||||
| 				return chains[i].BeforeEffective(chains[j]) | ||||
| @ -215,7 +326,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		// we can't fit this chain and its dependencies because of block gasLimit -- we are
 | ||||
| 		// we can't fit this chain and its dependencies because of block limits -- we are
 | ||||
| 		// at the edge
 | ||||
| 		last = i | ||||
| 		break | ||||
| @ -232,12 +343,16 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ | ||||
| 	//    we might have to do it multiple times to satisfy tail packing
 | ||||
| 	startTail := time.Now() | ||||
| tailLoop: | ||||
| 	for gasLimit >= minGas && last < len(chains) { | ||||
| 		// trim if necessary
 | ||||
| 		if chains[last].gasLimit > gasLimit { | ||||
| 			chains[last].Trim(gasLimit, build.BlockMessageLimit-len(result), mp, baseFee) | ||||
| 	for result.gasLimit >= minGas && last < len(chains) { | ||||
| 
 | ||||
| 		if !chains[last].valid { | ||||
| 			last++ | ||||
| 			continue tailLoop | ||||
| 		} | ||||
| 
 | ||||
| 		// trim if necessary
 | ||||
| 		result.trimChain(chains[last], mp, baseFee) | ||||
| 
 | ||||
| 		// push down if it hasn't been invalidated
 | ||||
| 		if chains[last].valid { | ||||
| 			for i := last; i < len(chains)-1; i++ { | ||||
| @ -249,7 +364,7 @@ tailLoop: | ||||
| 		} | ||||
| 
 | ||||
| 		// select the next (valid and fitting) chain and its dependencies for inclusion
 | ||||
| 		for i, chain := range chains[last:] { | ||||
| 		for _, chain := range chains[last:] { | ||||
| 			// has the chain been invalidated?
 | ||||
| 			if !chain.valid { | ||||
| 				continue | ||||
| @ -265,49 +380,10 @@ tailLoop: | ||||
| 				break tailLoop | ||||
| 			} | ||||
| 
 | ||||
| 			// compute the dependencies that must be merged and the gas limit including deps
 | ||||
| 			chainGasLimit := chain.gasLimit | ||||
| 			chainMsgLimit := len(chain.msgs) | ||||
| 			depGasLimit := int64(0) | ||||
| 			depMsgLimit := 0 | ||||
| 			var chainDeps []*msgChain | ||||
| 			for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { | ||||
| 				chainDeps = append(chainDeps, curChain) | ||||
| 				chainGasLimit += curChain.gasLimit | ||||
| 				chainMsgLimit += len(curChain.msgs) | ||||
| 				depGasLimit += curChain.gasLimit | ||||
| 				depMsgLimit += len(curChain.msgs) | ||||
| 			} | ||||
| 
 | ||||
| 			// does it all fit in the bock
 | ||||
| 			if chainGasLimit <= gasLimit && len(result)+chainMsgLimit <= build.BlockMessageLimit { | ||||
| 				// include it together with all dependencies
 | ||||
| 				for i := len(chainDeps) - 1; i >= 0; i-- { | ||||
| 					curChain := chainDeps[i] | ||||
| 					curChain.merged = true | ||||
| 					result = append(result, curChain.msgs...) | ||||
| 				} | ||||
| 
 | ||||
| 				chain.merged = true | ||||
| 				result = append(result, chain.msgs...) | ||||
| 				gasLimit -= chainGasLimit | ||||
| 			if result.tryToAddWithDeps(chain, mp, baseFee) { | ||||
| 				continue | ||||
| 			} | ||||
| 
 | ||||
| 			// it doesn't all fit; now we have to take into account the dependent chains before
 | ||||
| 			// making a decision about trimming or invalidating.
 | ||||
| 			// if the dependencies exceed the block limits, then we must invalidate the chain
 | ||||
| 			// as it can never be included.
 | ||||
| 			// Otherwise we can just trim and continue
 | ||||
| 			if depGasLimit > gasLimit || len(result)+depMsgLimit >= build.BlockMessageLimit { | ||||
| 				chain.Invalidate() | ||||
| 				last += i + 1 | ||||
| 				continue tailLoop | ||||
| 			} | ||||
| 
 | ||||
| 			// dependencies fit, just trim it
 | ||||
| 			chain.Trim(gasLimit-depGasLimit, build.BlockMessageLimit-len(result)-depMsgLimit, mp, baseFee) | ||||
| 			last += i | ||||
| 			continue tailLoop | ||||
| 		} | ||||
| 
 | ||||
| @ -321,15 +397,15 @@ tailLoop: | ||||
| 
 | ||||
| 	// if we have room to spare, pick some random (non-negative) chains to fill the block
 | ||||
| 	// we pick randomly so that we minimize the probability of duplication among all block producers
 | ||||
| 	if gasLimit >= minGas && len(result) <= build.BlockMessageLimit { | ||||
| 		randomCount := 0 | ||||
| 	if result.gasLimit >= minGas && len(result.msgs) <= build.BlockMessageLimit { | ||||
| 		preRandomLength := len(result.msgs) | ||||
| 
 | ||||
| 		startRandom := time.Now() | ||||
| 		shuffleChains(chains) | ||||
| 
 | ||||
| 		for _, chain := range chains { | ||||
| 			// have we filled the block
 | ||||
| 			if gasLimit < minGas || len(result) >= build.BlockMessageLimit { | ||||
| 			if result.gasLimit < minGas || len(result.msgs) >= build.BlockMessageLimit { | ||||
| 				break | ||||
| 			} | ||||
| 
 | ||||
| @ -343,63 +419,31 @@ tailLoop: | ||||
| 				continue | ||||
| 			} | ||||
| 
 | ||||
| 			// compute the dependencies that must be merged and the gas limit including deps
 | ||||
| 			chainGasLimit := chain.gasLimit | ||||
| 			chainMsgLimit := len(chain.msgs) | ||||
| 			depGasLimit := int64(0) | ||||
| 			depMsgLimit := 0 | ||||
| 			var chainDeps []*msgChain | ||||
| 			for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { | ||||
| 				chainDeps = append(chainDeps, curChain) | ||||
| 				chainGasLimit += curChain.gasLimit | ||||
| 				chainMsgLimit += len(curChain.msgs) | ||||
| 				depGasLimit += curChain.gasLimit | ||||
| 				depMsgLimit += len(curChain.msgs) | ||||
| 			} | ||||
| 
 | ||||
| 			// do the deps fit? if the deps won't fit, invalidate the chain
 | ||||
| 			if depGasLimit > gasLimit || len(result)+depMsgLimit > build.BlockMessageLimit { | ||||
| 				chain.Invalidate() | ||||
| 			if result.tryToAddWithDeps(chain, mp, baseFee) { | ||||
| 				continue | ||||
| 			} | ||||
| 
 | ||||
| 			// do they fit as is? if it doesn't, trim to make it fit if possible
 | ||||
| 			if chainGasLimit > gasLimit || len(result)+chainMsgLimit > build.BlockMessageLimit { | ||||
| 				chain.Trim(gasLimit-depGasLimit, build.BlockMessageLimit-len(result)-depMsgLimit, mp, baseFee) | ||||
| 
 | ||||
| 				if !chain.valid { | ||||
| 			if chain.valid { | ||||
| 				// chain got trimmed on the previous call to tryToAddWithDeps, can now be included
 | ||||
| 				result.tryToAddWithDeps(chain, mp, baseFee) | ||||
| 				continue | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 			// include it together with all dependencies
 | ||||
| 			for i := len(chainDeps) - 1; i >= 0; i-- { | ||||
| 				curChain := chainDeps[i] | ||||
| 				curChain.merged = true | ||||
| 				result = append(result, curChain.msgs...) | ||||
| 				randomCount += len(curChain.msgs) | ||||
| 			} | ||||
| 
 | ||||
| 			chain.merged = true | ||||
| 			result = append(result, chain.msgs...) | ||||
| 			randomCount += len(chain.msgs) | ||||
| 			gasLimit -= chainGasLimit | ||||
| 		} | ||||
| 
 | ||||
| 		if dt := time.Since(startRandom); dt > time.Millisecond { | ||||
| 			log.Infow("pack random tail chains done", "took", dt) | ||||
| 		} | ||||
| 
 | ||||
| 		if randomCount > 0 { | ||||
| 		if len(result.msgs) != preRandomLength { | ||||
| 			log.Warnf("optimal selection failed to pack a block; picked %d messages with random selection", | ||||
| 				randomCount) | ||||
| 				len(result.msgs)-preRandomLength) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return result, nil | ||||
| } | ||||
| 
 | ||||
| func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { | ||||
| func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) (*selectedMessages, error) { | ||||
| 	start := time.Now() | ||||
| 
 | ||||
| 	baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) | ||||
| @ -425,10 +469,10 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type | ||||
| 
 | ||||
| 	// 0b. Select all priority messages that fit in the block
 | ||||
| 	minGas := int64(gasguess.MinGas) | ||||
| 	result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts) | ||||
| 	result := mp.selectPriorityMessages(ctx, pending, baseFee, ts) | ||||
| 
 | ||||
| 	// have we filled the block?
 | ||||
| 	if gasLimit < minGas || len(result) > build.BlockMessageLimit { | ||||
| 	if result.gasLimit < minGas || len(result.msgs) > build.BlockMessageLimit { | ||||
| 		return result, nil | ||||
| 	} | ||||
| 
 | ||||
| @ -464,9 +508,8 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type | ||||
| 		} | ||||
| 
 | ||||
| 		// does it fit in the block?
 | ||||
| 		if chain.gasLimit <= gasLimit && len(result)+len(chain.msgs) <= build.BlockMessageLimit { | ||||
| 			gasLimit -= chain.gasLimit | ||||
| 			result = append(result, chain.msgs...) | ||||
| 		if result.tryToAdd(chain) { | ||||
| 			// there was room, we added the chain, keep going
 | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| @ -486,9 +529,9 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type | ||||
| 	// have to do it multiple times to satisfy tail packing.
 | ||||
| 	startTail := time.Now() | ||||
| tailLoop: | ||||
| 	for gasLimit >= minGas && last < len(chains) { | ||||
| 	for result.gasLimit >= minGas && last < len(chains) { | ||||
| 		// trim
 | ||||
| 		chains[last].Trim(gasLimit, build.BlockMessageLimit-len(result), mp, baseFee) | ||||
| 		result.trimChain(chains[last], mp, baseFee) | ||||
| 
 | ||||
| 		// push down if it hasn't been invalidated
 | ||||
| 		if chains[last].valid { | ||||
| @ -513,9 +556,8 @@ tailLoop: | ||||
| 			} | ||||
| 
 | ||||
| 			// does it fit in the bock?
 | ||||
| 			if chain.gasLimit <= gasLimit && len(result)+len(chain.msgs) <= build.BlockMessageLimit { | ||||
| 				gasLimit -= chain.gasLimit | ||||
| 				result = append(result, chain.msgs...) | ||||
| 			if result.tryToAdd(chain) { | ||||
| 				// there was room, we added the chain, keep going
 | ||||
| 				continue | ||||
| 			} | ||||
| 
 | ||||
| @ -535,7 +577,7 @@ tailLoop: | ||||
| 	return result, nil | ||||
| } | ||||
| 
 | ||||
| func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) { | ||||
| func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *selectedMessages { | ||||
| 	start := time.Now() | ||||
| 	defer func() { | ||||
| 		if dt := time.Since(start); dt > time.Millisecond { | ||||
| @ -543,8 +585,12 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a | ||||
| 		} | ||||
| 	}() | ||||
| 	mpCfg := mp.getConfig() | ||||
| 	result := make([]*types.SignedMessage, 0, mpCfg.SizeLimitLow) | ||||
| 	gasLimit := int64(build.BlockGasLimit) | ||||
| 	result := &selectedMessages{ | ||||
| 		msgs:      make([]*types.SignedMessage, 0, mpCfg.SizeLimitLow), | ||||
| 		gasLimit:  int64(build.BlockGasLimit), | ||||
| 		blsLimit:  cbg.MaxLength, | ||||
| 		secpLimit: cbg.MaxLength, | ||||
| 	} | ||||
| 	minGas := int64(gasguess.MinGas) | ||||
| 
 | ||||
| 	// 1. Get priority actor chains
 | ||||
| @ -554,7 +600,7 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a | ||||
| 		pk, err := mp.resolveToKey(ctx, actor) | ||||
| 		if err != nil { | ||||
| 			log.Debugf("mpooladdlocal failed to resolve sender: %s", err) | ||||
| 			return nil, gasLimit | ||||
| 			return result | ||||
| 		} | ||||
| 
 | ||||
| 		mset, ok := pending[pk] | ||||
| @ -566,9 +612,8 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a | ||||
| 			chains = append(chains, next...) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if len(chains) == 0 { | ||||
| 		return nil, gasLimit | ||||
| 		return result | ||||
| 	} | ||||
| 
 | ||||
| 	// 2. Sort the chains
 | ||||
| @ -578,7 +623,7 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a | ||||
| 
 | ||||
| 	if len(chains) != 0 && chains[0].gasPerf < 0 { | ||||
| 		log.Warnw("all priority messages in mpool have negative gas performance", "bestGasPerf", chains[0].gasPerf) | ||||
| 		return nil, gasLimit | ||||
| 		return result | ||||
| 	} | ||||
| 
 | ||||
| 	// 3. Merge chains until the block limit, as long as they have non-negative gas performance
 | ||||
| @ -588,9 +633,8 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a | ||||
| 			break | ||||
| 		} | ||||
| 
 | ||||
| 		if chain.gasLimit <= gasLimit { | ||||
| 			gasLimit -= chain.gasLimit | ||||
| 			result = append(result, chain.msgs...) | ||||
| 		if result.tryToAdd(chain) { | ||||
| 			// there was room, we added the chain, keep going
 | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| @ -600,9 +644,10 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a | ||||
| 	} | ||||
| 
 | ||||
| tailLoop: | ||||
| 	for gasLimit >= minGas && last < len(chains) { | ||||
| 	for result.gasLimit >= minGas && last < len(chains) { | ||||
| 		// trim, discarding negative performing messages
 | ||||
| 		chains[last].Trim(gasLimit, mp, baseFee) | ||||
| 
 | ||||
| 		result.trimChain(chains[last], mp, baseFee) | ||||
| 
 | ||||
| 		// push down if it hasn't been invalidated
 | ||||
| 		if chains[last].valid { | ||||
| @ -627,9 +672,8 @@ tailLoop: | ||||
| 			} | ||||
| 
 | ||||
| 			// does it fit in the bock?
 | ||||
| 			if chain.gasLimit <= gasLimit { | ||||
| 				gasLimit -= chain.gasLimit | ||||
| 				result = append(result, chain.msgs...) | ||||
| 			if result.tryToAdd(chain) { | ||||
| 				// there was room, we added the chain, keep going
 | ||||
| 				continue | ||||
| 			} | ||||
| 
 | ||||
| @ -643,7 +687,7 @@ tailLoop: | ||||
| 		break | ||||
| 	} | ||||
| 
 | ||||
| 	return result, gasLimit | ||||
| 	return result | ||||
| } | ||||
| 
 | ||||
| func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.Address]map[uint64]*types.SignedMessage, error) { | ||||
| @ -811,6 +855,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 | ||||
| 		chain.gasLimit = m.Message.GasLimit | ||||
| 		chain.gasPerf = mp.getGasPerf(chain.gasReward, chain.gasLimit) | ||||
| 		chain.valid = true | ||||
| 		chain.sigType = m.Signature.Type | ||||
| 		return chain | ||||
| 	} | ||||
| 
 | ||||
| @ -910,6 +955,7 @@ func (mc *msgChain) Trim(gasLimit int64, msgLimit int, mp *MessagePool, baseFee | ||||
| 		mc.msgs = mc.msgs[:i+1] | ||||
| 	} | ||||
| 
 | ||||
| 	// TODO: if the trim above is a no-op, this (may) needlessly invalidates the next chain
 | ||||
| 	if mc.next != nil { | ||||
| 		mc.next.Invalidate() | ||||
| 		mc.next = nil | ||||
|  | ||||
| @ -577,7 +577,7 @@ func TestMessageSelectionTrimming(t *testing.T) { | ||||
| 
 | ||||
| 	expected := int(build.BlockGasLimit / gasLimit) | ||||
| 	if len(msgs) != expected { | ||||
| 		t.Fatalf("expected %d messages, bug got %d", expected, len(msgs)) | ||||
| 		t.Fatalf("expected %d messages, but got %d", expected, len(msgs)) | ||||
| 	} | ||||
| 
 | ||||
| 	mGasLimit := int64(0) | ||||
| @ -978,7 +978,7 @@ func TestOptimalMessageSelection2(t *testing.T) { | ||||
| func TestOptimalMessageSelection3(t *testing.T) { | ||||
| 	// this test uses 10 actors sending a block of messages to each other, with the the first
 | ||||
| 	// actors paying higher gas premium than the subsequent actors.
 | ||||
| 	// We select with a low ticket quality; the chain depenent merging algorithm should pick
 | ||||
| 	// We select with a low ticket quality; the chain dependent merging algorithm should pick
 | ||||
| 	// messages from the median actor from the start
 | ||||
| 	mp, tma := makeTestMpool() | ||||
| 
 | ||||
| @ -1109,11 +1109,13 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu | ||||
| 	logging.SetLogLevel("messagepool", "error") | ||||
| 
 | ||||
| 	// 1. greedy selection
 | ||||
| 	greedyMsgs, err := mp.selectMessagesGreedy(context.Background(), ts, ts) | ||||
| 	gm, err := mp.selectMessagesGreedy(context.Background(), ts, ts) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	greedyMsgs := gm.msgs | ||||
| 
 | ||||
| 	totalGreedyCapacity := 0.0 | ||||
| 	totalGreedyReward := 0.0 | ||||
| 	totalOptimalCapacity := 0.0 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user