Improved transaction pool

The transaction pool will now some easily be able to pre determine the
validity of a transaction by checking the following:

* Account existst
* gas limit higher than the instrinsic gas
* enough funds to pay upfront costs
* nonce check
This commit is contained in:
obscuren 2015-04-08 20:47:32 +02:00
parent a7750c929b
commit 6184781b49
8 changed files with 111 additions and 149 deletions

View File

@ -45,8 +45,8 @@ func NewChainMan(block *types.Block, eventMux *event.TypeMux, db common.Database
return newChainManager(block, eventMux, db) return newChainManager(block, eventMux, db)
} }
func NewBlockProc(db common.Database, txpool *TxPool, cman *ChainManager, eventMux *event.TypeMux) *BlockProcessor { func NewBlockProc(db common.Database, cman *ChainManager, eventMux *event.TypeMux) *BlockProcessor {
return newBlockProcessor(db, txpool, cman, eventMux) return newBlockProcessor(db, cman, eventMux)
} }
func NewCanonical(n int, db common.Database) (*BlockProcessor, error) { func NewCanonical(n int, db common.Database) (*BlockProcessor, error) {
@ -120,8 +120,10 @@ func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Data
} }
// block processor with fake pow // block processor with fake pow
func newBlockProcessor(db common.Database, txpool *TxPool, cman *ChainManager, eventMux *event.TypeMux) *BlockProcessor { func newBlockProcessor(db common.Database, cman *ChainManager, eventMux *event.TypeMux) *BlockProcessor {
bman := NewBlockProcessor(db, db, FakePow{}, txpool, newChainManager(nil, eventMux, db), eventMux) chainMan := newChainManager(nil, eventMux, db)
txpool := NewTxPool(eventMux, chainMan.State)
bman := NewBlockProcessor(db, db, FakePow{}, txpool, chainMan, eventMux)
return bman return bman
} }
@ -129,9 +131,8 @@ func newBlockProcessor(db common.Database, txpool *TxPool, cman *ChainManager, e
// on result of makeChain // on result of makeChain
func newCanonical(n int, db common.Database) (*BlockProcessor, error) { func newCanonical(n int, db common.Database) (*BlockProcessor, error) {
eventMux := &event.TypeMux{} eventMux := &event.TypeMux{}
txpool := NewTxPool(eventMux)
bman := newBlockProcessor(db, txpool, newChainManager(nil, eventMux, db), eventMux) bman := newBlockProcessor(db, newChainManager(nil, eventMux, db), eventMux)
bman.bc.SetProcessor(bman) bman.bc.SetProcessor(bman)
parent := bman.bc.CurrentBlock() parent := bman.bc.CurrentBlock()
if n == 0 { if n == 0 {

View File

@ -255,7 +255,7 @@ func TestChainInsertions(t *testing.T) {
var eventMux event.TypeMux var eventMux event.TypeMux
chainMan := NewChainManager(db, db, &eventMux) chainMan := NewChainManager(db, db, &eventMux)
txPool := NewTxPool(&eventMux) txPool := NewTxPool(&eventMux, chainMan.State)
blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux) blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux)
chainMan.SetProcessor(blockMan) chainMan.SetProcessor(blockMan)
@ -301,7 +301,7 @@ func TestChainMultipleInsertions(t *testing.T) {
} }
var eventMux event.TypeMux var eventMux event.TypeMux
chainMan := NewChainManager(db, db, &eventMux) chainMan := NewChainManager(db, db, &eventMux)
txPool := NewTxPool(&eventMux) txPool := NewTxPool(&eventMux, chainMan.State)
blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux) blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux)
chainMan.SetProcessor(blockMan) chainMan.SetProcessor(blockMan)
done := make(chan bool, max) done := make(chan bool, max)

View File

@ -63,14 +63,6 @@ func (self *StateDB) Logs() Logs {
return logs return logs
} }
/*
func (self *StateDB) Logs(txHash, blockHash common.Hash, txIndex uint) Logs {
self.logs.SetInfo(txHash, blockHash, txIndex)
return self.logs
}
*/
func (self *StateDB) Refund(address common.Address, gas *big.Int) { func (self *StateDB) Refund(address common.Address, gas *big.Int) {
addr := address.Str() addr := address.Str()
if self.refund[addr] == nil { if self.refund[addr] == nil {
@ -83,6 +75,10 @@ func (self *StateDB) Refund(address common.Address, gas *big.Int) {
* GETTERS * GETTERS
*/ */
func (self *StateDB) HasAccount(addr common.Address) bool {
return self.GetStateObject(addr) != nil
}
// Retrieve the balance from the given address or 0 if object not found // Retrieve the balance from the given address or 0 if object not found
func (self *StateDB) GetBalance(addr common.Address) *big.Int { func (self *StateDB) GetBalance(addr common.Address) *big.Int {
stateObject := self.GetStateObject(addr) stateObject := self.GetStateObject(addr)

View File

@ -74,6 +74,19 @@ func MessageGasValue(msg Message) *big.Int {
return new(big.Int).Mul(msg.Gas(), msg.GasPrice()) return new(big.Int).Mul(msg.Gas(), msg.GasPrice())
} }
func IntrinsicGas(msg Message) *big.Int {
igas := new(big.Int).Set(params.TxGas)
for _, byt := range msg.Data() {
if byt != 0 {
igas.Add(igas, params.TxDataNonZeroGas)
} else {
igas.Add(igas, params.TxDataZeroGas)
}
}
return igas
}
func ApplyMessage(env vm.Environment, msg Message, coinbase *state.StateObject) ([]byte, *big.Int, error) { func ApplyMessage(env vm.Environment, msg Message, coinbase *state.StateObject) ([]byte, *big.Int, error) {
return NewStateTransition(env, msg, coinbase).transitionState() return NewStateTransition(env, msg, coinbase).transitionState()
} }
@ -177,22 +190,8 @@ func (self *StateTransition) transitionState() (ret []byte, usedGas *big.Int, er
sender = self.From() sender = self.From()
) )
// Transaction gas // Pay intrinsic gas
if err = self.UseGas(params.TxGas); err != nil { if err = self.UseGas(IntrinsicGas(self.msg)); err != nil {
return nil, nil, InvalidTxError(err)
}
// Pay data gas
dgas := new(big.Int)
for _, byt := range self.data {
if byt != 0 {
dgas.Add(dgas, params.TxDataNonZeroGas)
} else {
dgas.Add(dgas, params.TxDataZeroGas)
}
}
if err = self.UseGas(dgas); err != nil {
return nil, nil, InvalidTxError(err) return nil, nil, InvalidTxError(err)
} }

View File

@ -3,19 +3,24 @@ package core
import ( import (
"errors" "errors"
"fmt" "fmt"
"math/big"
"sync" "sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
var ( var (
txplogger = logger.NewLogger("TXP") ErrInvalidSender = errors.New("Invalid sender")
ErrImpossibleNonce = errors.New("Impossible nonce")
ErrInvalidSender = errors.New("Invalid sender") ErrNonExistentAccount = errors.New("Account does not exist")
ErrInsufficientFunds = errors.New("Insufficient funds")
ErrIntrinsicGas = errors.New("Intrinsic gas too low")
) )
const txPoolQueueSize = 50 const txPoolQueueSize = 50
@ -41,52 +46,62 @@ type TxPool struct {
queueChan chan *types.Transaction queueChan chan *types.Transaction
// Quiting channel // Quiting channel
quit chan bool quit chan bool
// The state function which will allow us to do some pre checkes
currentState func() *state.StateDB
// The actual pool // The actual pool
//pool *list.List
txs map[common.Hash]*types.Transaction txs map[common.Hash]*types.Transaction
invalidHashes *set.Set invalidHashes *set.Set
SecondaryProcessor TxProcessor
subscribers []chan TxMsg subscribers []chan TxMsg
eventMux *event.TypeMux eventMux *event.TypeMux
} }
func NewTxPool(eventMux *event.TypeMux) *TxPool { func NewTxPool(eventMux *event.TypeMux, currentStateFn func() *state.StateDB) *TxPool {
return &TxPool{ return &TxPool{
txs: make(map[common.Hash]*types.Transaction), txs: make(map[common.Hash]*types.Transaction),
queueChan: make(chan *types.Transaction, txPoolQueueSize), queueChan: make(chan *types.Transaction, txPoolQueueSize),
quit: make(chan bool), quit: make(chan bool),
eventMux: eventMux, eventMux: eventMux,
invalidHashes: set.New(), invalidHashes: set.New(),
currentState: currentStateFn,
} }
} }
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
// Validate sender // Validate sender
if _, err := tx.From(); err != nil { var (
from common.Address
err error
)
if from, err = tx.From(); err != nil {
return ErrInvalidSender return ErrInvalidSender
} }
// Validate curve param // Validate curve param
v, _, _ := tx.Curve() v, _, _ := tx.Curve()
if v > 28 || v < 27 { if v > 28 || v < 27 {
return fmt.Errorf("tx.v != (28 || 27) => %v", v) return fmt.Errorf("tx.v != (28 || 27) => %v", v)
} }
return nil
/* XXX this kind of validation needs to happen elsewhere in the gui when sending txs. if !pool.currentState().HasAccount(from) {
Other clients should do their own validation. Value transfer could throw error return ErrNonExistentAccount
but doesn't necessarily invalidate the tx. Gas can still be payed for and miner
can still be rewarded for their inclusion and processing.
sender := pool.stateQuery.GetAccount(senderAddr)
totAmount := new(big.Int).Set(tx.Value())
// Make sure there's enough in the sender's account. Having insufficient
// funds won't invalidate this transaction but simple ignores it.
if sender.Balance().Cmp(totAmount) < 0 {
return fmt.Errorf("Insufficient amount in sender's (%x) account", tx.From())
} }
*/
if pool.currentState().GetBalance(from).Cmp(new(big.Int).Mul(tx.Price, tx.GasLimit)) < 0 {
return ErrInsufficientFunds
}
if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 {
return ErrIntrinsicGas
}
if pool.currentState().GetNonce(from) > tx.Nonce() {
return ErrImpossibleNonce
}
return nil
} }
func (self *TxPool) addTx(tx *types.Transaction) { func (self *TxPool) addTx(tx *types.Transaction) {
@ -96,10 +111,12 @@ func (self *TxPool) addTx(tx *types.Transaction) {
func (self *TxPool) add(tx *types.Transaction) error { func (self *TxPool) add(tx *types.Transaction) error {
hash := tx.Hash() hash := tx.Hash()
/* XXX I'm unsure about this. This is extremely dangerous and may result
in total black listing of certain transactions
if self.invalidHashes.Has(hash) { if self.invalidHashes.Has(hash) {
return fmt.Errorf("Invalid transaction (%x)", hash[:4]) return fmt.Errorf("Invalid transaction (%x)", hash[:4])
} }
*/
if self.txs[hash] != nil { if self.txs[hash] != nil {
return fmt.Errorf("Known transaction (%x)", hash[:4]) return fmt.Errorf("Known transaction (%x)", hash[:4])
} }
@ -121,7 +138,10 @@ func (self *TxPool) add(tx *types.Transaction) error {
// verified in ValidateTransaction. // verified in ValidateTransaction.
f, _ := tx.From() f, _ := tx.From()
from := common.Bytes2Hex(f[:4]) from := common.Bytes2Hex(f[:4])
txplogger.Debugf("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
if glog.V(logger.Debug) {
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
}
// Notify the subscribers // Notify the subscribers
go self.eventMux.Post(TxPreEvent{tx}) go self.eventMux.Post(TxPreEvent{tx})
@ -146,10 +166,10 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) {
for _, tx := range txs { for _, tx := range txs {
if err := self.add(tx); err != nil { if err := self.add(tx); err != nil {
txplogger.Debugln(err) glog.V(logger.Debug).Infoln(err)
} else { } else {
h := tx.Hash() h := tx.Hash()
txplogger.Debugf("tx %x\n", h[:4]) glog.V(logger.Debug).Infof("tx %x\n", h[:4])
} }
} }
} }
@ -168,23 +188,6 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) {
return return
} }
func (pool *TxPool) RemoveInvalid(query StateQuery) {
pool.mu.Lock()
var removedTxs types.Transactions
for _, tx := range pool.txs {
from, _ := tx.From()
sender := query.GetAccount(from[:])
err := pool.ValidateTransaction(tx)
if err != nil || sender.Nonce() >= tx.Nonce() {
removedTxs = append(removedTxs, tx)
}
}
pool.mu.Unlock()
pool.RemoveSet(removedTxs)
}
func (self *TxPool) RemoveSet(txs types.Transactions) { func (self *TxPool) RemoveSet(txs types.Transactions) {
self.mu.Lock() self.mu.Lock()
defer self.mu.Unlock() defer self.mu.Unlock()
@ -214,5 +217,5 @@ func (pool *TxPool) Start() {
func (pool *TxPool) Stop() { func (pool *TxPool) Stop() {
pool.Flush() pool.Flush()
txplogger.Infoln("Stopped") glog.V(logger.Info).Infoln("TX Pool stopped")
} }

View File

@ -13,87 +13,50 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
) )
// State query interface
type stateQuery struct{ db common.Database }
func SQ() stateQuery {
db, _ := ethdb.NewMemDatabase()
return stateQuery{db: db}
}
func (self stateQuery) GetAccount(addr []byte) *state.StateObject {
return state.NewStateObject(common.BytesToAddress(addr), self.db)
}
func transaction() *types.Transaction { func transaction() *types.Transaction {
return types.NewTransactionMessage(common.Address{}, common.Big0, common.Big0, common.Big0, nil) return types.NewTransactionMessage(common.Address{}, big.NewInt(100), big.NewInt(100), big.NewInt(100), nil)
} }
func setup() (*TxPool, *ecdsa.PrivateKey) { func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
db, _ := ethdb.NewMemDatabase()
statedb := state.New(common.Hash{}, db)
var m event.TypeMux var m event.TypeMux
key, _ := crypto.GenerateKey() key, _ := crypto.GenerateKey()
return NewTxPool(&m), key return NewTxPool(&m, func() *state.StateDB { return statedb }), key
} }
func TestTxAdding(t *testing.T) { func TestInvalidTransactions(t *testing.T) {
pool, key := setup() pool, key := setupTxPool()
tx1 := transaction()
tx1.SignECDSA(key) tx := transaction()
err := pool.Add(tx1) tx.SignECDSA(key)
if err != nil { err := pool.Add(tx)
t.Error(err) if err != ErrNonExistentAccount {
t.Error("expected", ErrNonExistentAccount)
} }
err = pool.Add(tx1) from, _ := tx.From()
if err == nil { pool.currentState().AddBalance(from, big.NewInt(1))
t.Error("added tx twice") err = pool.Add(tx)
} if err != ErrInsufficientFunds {
} t.Error("expected", ErrInsufficientFunds)
}
func TestAddInvalidTx(t *testing.T) {
pool, _ := setup() pool.currentState().AddBalance(from, big.NewInt(100*100))
tx1 := transaction() err = pool.Add(tx)
err := pool.Add(tx1) if err != ErrIntrinsicGas {
if err == nil { t.Error("expected", ErrIntrinsicGas)
t.Error("expected error") }
}
} pool.currentState().SetNonce(from, 1)
pool.currentState().AddBalance(from, big.NewInt(0xffffffffffffff))
func TestRemoveSet(t *testing.T) { tx.GasLimit = big.NewInt(100000)
pool, _ := setup() tx.Price = big.NewInt(1)
tx1 := transaction() tx.SignECDSA(key)
pool.addTx(tx1)
pool.RemoveSet(types.Transactions{tx1}) err = pool.Add(tx)
if pool.Size() > 0 { if err != ErrImpossibleNonce {
t.Error("expected pool size to be 0") t.Error("expected", ErrImpossibleNonce)
}
}
func TestRemoveInvalid(t *testing.T) {
pool, key := setup()
tx1 := transaction()
pool.addTx(tx1)
pool.RemoveInvalid(SQ())
if pool.Size() > 0 {
t.Error("expected pool size to be 0")
}
tx1.SetNonce(1)
tx1.SignECDSA(key)
pool.addTx(tx1)
pool.RemoveInvalid(SQ())
if pool.Size() != 1 {
t.Error("expected pool size to be 1, is", pool.Size())
}
}
func TestInvalidSender(t *testing.T) {
pool, _ := setup()
tx := new(types.Transaction)
tx.R = new(big.Int)
tx.S = new(big.Int)
err := pool.ValidateTransaction(tx)
if err != ErrInvalidSender {
t.Errorf("expected %v, got %v", ErrInvalidSender, err)
} }
} }

View File

@ -195,7 +195,7 @@ func New(config *Config) (*Ethereum, error) {
eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
eth.pow = ethash.New(eth.chainManager) eth.pow = ethash.New(eth.chainManager)
eth.txPool = core.NewTxPool(eth.EventMux()) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State)
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux())
eth.chainManager.SetProcessor(eth.blockProcessor) eth.chainManager.SetProcessor(eth.blockProcessor)
eth.whisper = whisper.New() eth.whisper = whisper.New()

View File

@ -288,7 +288,7 @@ gasLimit:
tcount++ tcount++
} }
} }
self.eth.TxPool().InvalidateSet(remove) //self.eth.TxPool().InvalidateSet(remove)
var ( var (
uncles []*types.Header uncles []*types.Header