From 89d42b5958182f2310129a4708050ec3f3f0d0ae Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 4 Aug 2020 22:01:32 +0300 Subject: [PATCH] message chain construction --- chain/messagepool/selection.go | 151 +++++++++++++++++++++++++++++++-- 1 file changed, 145 insertions(+), 6 deletions(-) diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index 5963fdd5e..751811baa 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -1,6 +1,7 @@ package messagepool import ( + "slice" "time" "github.com/filecoin-project/go-address" @@ -11,7 +12,6 @@ import ( ) type msgChain struct { - actor address.Address msgs []*types.SignedMessage gasReward uint64 gasLimit uint64 @@ -45,7 +45,7 @@ func (mp *MessagePool) selectMessages(ts *types.TipSet) []*types.SignedMessage { } // 2. Sort the chains - slice.Sort(chains, func(i, j int) { + slice.Sort(chains, func(i, j int) bool { return chains[i].Before(chains[j]) }) @@ -83,7 +83,7 @@ tailLoop: // trim chains[last].Trim(gasLimit, al) - // push down + // push down if it hasn't been invalidated if mc.valid { for i := last; i < len(chains)-1; i++ { if chains[i].Before(chains[i+1]) { @@ -110,7 +110,8 @@ tailLoop: continue tailLoop } - // the merge loop ended after processing all the chains -- mark the end. + // the merge loop ended after processing all the chains and we probably still have gas to spare + // -- mark the end. last = len(chains) } @@ -118,8 +119,146 @@ tailLoop: } func (mp *MessagePool) createMessageChains(actor address.Address, mset *msgSet, ts *types.TipSet) []*msgChain { - // TODO - return nil + // collect all messages + msgs := make([]*types.SignedMessage, 0, len(mset.msgs)) + for _, m := range mset.msgs { + msgs = append(msgs, m) + } + + // sort by nonce + slice.Sort(msgs, func(i, j int) bool { + return msgs[i].Nonce < msgs[j].Nonce + }) + + // sanity checks: + // - there can be no gaps in nonces, starting from the current actor nonce + // if there is a gap, drop messages after the gap, we can't include them + // - all messages must have minimum gas and the total gas for the candidate messages + // cannot exceed the block limit; drop all messages that exceed the limit + // - the total gasReward cannot exceed the actor's balance; drop all messages that exceed + // the balance + al := func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) { + return a + } + + minGas := guessgas.MinGas + a, _ := mp.api.StateGetActor(actor, nil) + curNonce := a.Nonce + balance := a.Balance + gasLimit := 0 + i := 0 + rewards := make([]uint64, 0, len(msgs)) + for i = 0; i < len(msgs); i++ { + m := msgs[i] + if m.Nonce != curNonce { + break + } + curNonce++ + + if m.GasLimit < minGas { + break + } + + gasLimit += m.GasLimit + if gasLimit > build.BlockGasLimit { + break + } + + gasUsed, _ := gasguess.GuessGasUsed(context.TODO(), types.EmptyTSK, m, al) + gasReward := gasUsed * m.GasPrice + if gasReward > balance { + break + } + balance -= gasReward + rewards = append(rewards, gasReward) + } + + // check we have a sane set of messages to construct the chains + if i > 0 { + msgs = msgs[:i] + } else { + return nil + } + + // ok, now we can construct the chains using the messages we have + // invariant: each chain has a bigger gasPerf than the next -- otherwise they can be merged + // and increase the gasPerf of the first chain + // We do this in two passes: + // - in the first pass we create chains that aggreagate messages with non-decreasing gasPerf + // - in the second pass we merge chains to maintain the invariant. + var chains []*msgChain + var curChain *msgChain + + newChain := func(m *types.SignedMessage, i int) *msgChain { + chain := new(msgChain) + chain.msgs = []*types.SignedMessage{m} + chain.gasReward = rewards[i] + chain.gasLimit = m.GasLimit + chain.gasPerf = curChain.gasRewards * build.BlockGasLimit / curChain.GasLimit + chain.valid = true + return chain + } + + // create the individual chains + for i, m := range msgs { + if curChain == nil { + curChain = newChain(m, i) + continue + } + + gasReward = curChain.gasReward + rewards[i] + gasLimit = curChain.gasLimit + m.GasLimit + gasPerf = gasReward * build.BlockGasLimit / gasLimit + + // try to add the message to the current chain -- if it decreases the gasPerf, then make a + // new chain + if gasPerf < curChain.gasPerf { + chains = append(chains, curChain) + curChain = newChain(m, i) + } else { + curChain.msgs = append(curChain.msgs, m) + curChain.gasReward = gasReward + curChain.gasLimit = gasLimit + curChain.gasPerf = gasPerf + } + } + chains = append(chains, curChain) + + // merge chains to maintain the invariant + for { + merged := 0 + + for i := len(chains) - 1; i > 0; i-- { + if chains[i].gasPerf > chains[i-1].gasPerf { + chains[i-1].msgs = append(chains[i-1].msgs, chains[i].msgs...) + chains[i-1].gasReward += chains[i].gasReward + chains[i-1].gasLimit += chains[i].gasLimit + chains[i-1].gasperf = chains[i-1].gasReward * build.BlockGasLimit / chains[i-1].gasLimit + chains[i].valid = false + merged++ + } + } + + if merged == 0 { + break + } + + // drop invalidated chains + newChains = make([]*msgChain, 0, len(chains)-merged) + for _, c := range chains { + if c.valid { + newChains = append(newChains, c) + } + } + chains = newChains + } + + // link dependent chains + for i := 0; i < len(chains)-1; i++ { + chains[i].next = chains[i+1] + } + + return chains } func (self *msgChain) Before(other *msgChain) bool {