From 6184781b49242b8029522612ad94cd45b508abc1 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 8 Apr 2015 20:47:32 +0200 Subject: [PATCH] Improved transaction pool The transaction pool will now some easily be able to pre determine the validity of a transaction by checking the following: * Account existst * gas limit higher than the instrinsic gas * enough funds to pay upfront costs * nonce check --- core/chain_makers.go | 13 ++-- core/chain_manager_test.go | 4 +- core/state/statedb.go | 12 ++-- core/state_transition.go | 31 +++++----- core/transaction_pool.go | 87 ++++++++++++++------------- core/transaction_pool_test.go | 109 +++++++++++----------------------- eth/backend.go | 2 +- miner/worker.go | 2 +- 8 files changed, 111 insertions(+), 149 deletions(-) diff --git a/core/chain_makers.go b/core/chain_makers.go index bbf1b1439..810741820 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -45,8 +45,8 @@ func NewChainMan(block *types.Block, eventMux *event.TypeMux, db common.Database return newChainManager(block, eventMux, db) } -func NewBlockProc(db common.Database, txpool *TxPool, cman *ChainManager, eventMux *event.TypeMux) *BlockProcessor { - return newBlockProcessor(db, txpool, cman, eventMux) +func NewBlockProc(db common.Database, cman *ChainManager, eventMux *event.TypeMux) *BlockProcessor { + return newBlockProcessor(db, cman, eventMux) } func NewCanonical(n int, db common.Database) (*BlockProcessor, error) { @@ -120,8 +120,10 @@ func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Data } // block processor with fake pow -func newBlockProcessor(db common.Database, txpool *TxPool, cman *ChainManager, eventMux *event.TypeMux) *BlockProcessor { - bman := NewBlockProcessor(db, db, FakePow{}, txpool, newChainManager(nil, eventMux, db), eventMux) +func newBlockProcessor(db common.Database, cman *ChainManager, eventMux *event.TypeMux) *BlockProcessor { + chainMan := newChainManager(nil, eventMux, db) + txpool := NewTxPool(eventMux, chainMan.State) + bman := NewBlockProcessor(db, db, FakePow{}, txpool, chainMan, eventMux) return bman } @@ -129,9 +131,8 @@ func newBlockProcessor(db common.Database, txpool *TxPool, cman *ChainManager, e // on result of makeChain func newCanonical(n int, db common.Database) (*BlockProcessor, error) { eventMux := &event.TypeMux{} - txpool := NewTxPool(eventMux) - bman := newBlockProcessor(db, txpool, newChainManager(nil, eventMux, db), eventMux) + bman := newBlockProcessor(db, newChainManager(nil, eventMux, db), eventMux) bman.bc.SetProcessor(bman) parent := bman.bc.CurrentBlock() if n == 0 { diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go index bf172f3bf..19afe0d5c 100644 --- a/core/chain_manager_test.go +++ b/core/chain_manager_test.go @@ -255,7 +255,7 @@ func TestChainInsertions(t *testing.T) { var eventMux event.TypeMux chainMan := NewChainManager(db, db, &eventMux) - txPool := NewTxPool(&eventMux) + txPool := NewTxPool(&eventMux, chainMan.State) blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux) chainMan.SetProcessor(blockMan) @@ -301,7 +301,7 @@ func TestChainMultipleInsertions(t *testing.T) { } var eventMux event.TypeMux chainMan := NewChainManager(db, db, &eventMux) - txPool := NewTxPool(&eventMux) + txPool := NewTxPool(&eventMux, chainMan.State) blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux) chainMan.SetProcessor(blockMan) done := make(chan bool, max) diff --git a/core/state/statedb.go b/core/state/statedb.go index 0651365f0..b3050515b 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -63,14 +63,6 @@ func (self *StateDB) Logs() Logs { return logs } -/* -func (self *StateDB) Logs(txHash, blockHash common.Hash, txIndex uint) Logs { - self.logs.SetInfo(txHash, blockHash, txIndex) - - return self.logs -} -*/ - func (self *StateDB) Refund(address common.Address, gas *big.Int) { addr := address.Str() if self.refund[addr] == nil { @@ -83,6 +75,10 @@ func (self *StateDB) Refund(address common.Address, gas *big.Int) { * GETTERS */ +func (self *StateDB) HasAccount(addr common.Address) bool { + return self.GetStateObject(addr) != nil +} + // Retrieve the balance from the given address or 0 if object not found func (self *StateDB) GetBalance(addr common.Address) *big.Int { stateObject := self.GetStateObject(addr) diff --git a/core/state_transition.go b/core/state_transition.go index e67abb951..d95cbd35a 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -74,6 +74,19 @@ func MessageGasValue(msg Message) *big.Int { return new(big.Int).Mul(msg.Gas(), msg.GasPrice()) } +func IntrinsicGas(msg Message) *big.Int { + igas := new(big.Int).Set(params.TxGas) + for _, byt := range msg.Data() { + if byt != 0 { + igas.Add(igas, params.TxDataNonZeroGas) + } else { + igas.Add(igas, params.TxDataZeroGas) + } + } + + return igas +} + func ApplyMessage(env vm.Environment, msg Message, coinbase *state.StateObject) ([]byte, *big.Int, error) { return NewStateTransition(env, msg, coinbase).transitionState() } @@ -177,22 +190,8 @@ func (self *StateTransition) transitionState() (ret []byte, usedGas *big.Int, er sender = self.From() ) - // Transaction gas - if err = self.UseGas(params.TxGas); err != nil { - return nil, nil, InvalidTxError(err) - } - - // Pay data gas - dgas := new(big.Int) - for _, byt := range self.data { - if byt != 0 { - dgas.Add(dgas, params.TxDataNonZeroGas) - } else { - dgas.Add(dgas, params.TxDataZeroGas) - } - } - - if err = self.UseGas(dgas); err != nil { + // Pay intrinsic gas + if err = self.UseGas(IntrinsicGas(self.msg)); err != nil { return nil, nil, InvalidTxError(err) } diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 58ea518d6..bfb54e24d 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -3,19 +3,24 @@ package core import ( "errors" "fmt" + "math/big" "sync" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "gopkg.in/fatih/set.v0" ) var ( - txplogger = logger.NewLogger("TXP") - - ErrInvalidSender = errors.New("Invalid sender") + ErrInvalidSender = errors.New("Invalid sender") + ErrImpossibleNonce = errors.New("Impossible nonce") + ErrNonExistentAccount = errors.New("Account does not exist") + ErrInsufficientFunds = errors.New("Insufficient funds") + ErrIntrinsicGas = errors.New("Intrinsic gas too low") ) const txPoolQueueSize = 50 @@ -41,52 +46,62 @@ type TxPool struct { queueChan chan *types.Transaction // Quiting channel quit chan bool + // The state function which will allow us to do some pre checkes + currentState func() *state.StateDB // The actual pool - //pool *list.List txs map[common.Hash]*types.Transaction invalidHashes *set.Set - SecondaryProcessor TxProcessor - subscribers []chan TxMsg eventMux *event.TypeMux } -func NewTxPool(eventMux *event.TypeMux) *TxPool { +func NewTxPool(eventMux *event.TypeMux, currentStateFn func() *state.StateDB) *TxPool { return &TxPool{ txs: make(map[common.Hash]*types.Transaction), queueChan: make(chan *types.Transaction, txPoolQueueSize), quit: make(chan bool), eventMux: eventMux, invalidHashes: set.New(), + currentState: currentStateFn, } } func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { // Validate sender - if _, err := tx.From(); err != nil { + var ( + from common.Address + err error + ) + + if from, err = tx.From(); err != nil { return ErrInvalidSender } + // Validate curve param v, _, _ := tx.Curve() if v > 28 || v < 27 { return fmt.Errorf("tx.v != (28 || 27) => %v", v) } - return nil - /* XXX this kind of validation needs to happen elsewhere in the gui when sending txs. - Other clients should do their own validation. Value transfer could throw error - but doesn't necessarily invalidate the tx. Gas can still be payed for and miner - can still be rewarded for their inclusion and processing. - sender := pool.stateQuery.GetAccount(senderAddr) - totAmount := new(big.Int).Set(tx.Value()) - // Make sure there's enough in the sender's account. Having insufficient - // funds won't invalidate this transaction but simple ignores it. - if sender.Balance().Cmp(totAmount) < 0 { - return fmt.Errorf("Insufficient amount in sender's (%x) account", tx.From()) + if !pool.currentState().HasAccount(from) { + return ErrNonExistentAccount } - */ + + if pool.currentState().GetBalance(from).Cmp(new(big.Int).Mul(tx.Price, tx.GasLimit)) < 0 { + return ErrInsufficientFunds + } + + if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 { + return ErrIntrinsicGas + } + + if pool.currentState().GetNonce(from) > tx.Nonce() { + return ErrImpossibleNonce + } + + return nil } func (self *TxPool) addTx(tx *types.Transaction) { @@ -96,10 +111,12 @@ func (self *TxPool) addTx(tx *types.Transaction) { func (self *TxPool) add(tx *types.Transaction) error { hash := tx.Hash() + /* XXX I'm unsure about this. This is extremely dangerous and may result + in total black listing of certain transactions if self.invalidHashes.Has(hash) { return fmt.Errorf("Invalid transaction (%x)", hash[:4]) } - + */ if self.txs[hash] != nil { return fmt.Errorf("Known transaction (%x)", hash[:4]) } @@ -121,7 +138,10 @@ func (self *TxPool) add(tx *types.Transaction) error { // verified in ValidateTransaction. f, _ := tx.From() from := common.Bytes2Hex(f[:4]) - txplogger.Debugf("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) + + if glog.V(logger.Debug) { + glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) + } // Notify the subscribers go self.eventMux.Post(TxPreEvent{tx}) @@ -146,10 +166,10 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) { for _, tx := range txs { if err := self.add(tx); err != nil { - txplogger.Debugln(err) + glog.V(logger.Debug).Infoln(err) } else { h := tx.Hash() - txplogger.Debugf("tx %x\n", h[:4]) + glog.V(logger.Debug).Infof("tx %x\n", h[:4]) } } } @@ -168,23 +188,6 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { return } -func (pool *TxPool) RemoveInvalid(query StateQuery) { - pool.mu.Lock() - - var removedTxs types.Transactions - for _, tx := range pool.txs { - from, _ := tx.From() - sender := query.GetAccount(from[:]) - err := pool.ValidateTransaction(tx) - if err != nil || sender.Nonce() >= tx.Nonce() { - removedTxs = append(removedTxs, tx) - } - } - pool.mu.Unlock() - - pool.RemoveSet(removedTxs) -} - func (self *TxPool) RemoveSet(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() @@ -214,5 +217,5 @@ func (pool *TxPool) Start() { func (pool *TxPool) Stop() { pool.Flush() - txplogger.Infoln("Stopped") + glog.V(logger.Info).Infoln("TX Pool stopped") } diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index abdc2709f..b7486adb3 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -13,87 +13,50 @@ import ( "github.com/ethereum/go-ethereum/event" ) -// State query interface -type stateQuery struct{ db common.Database } - -func SQ() stateQuery { - db, _ := ethdb.NewMemDatabase() - return stateQuery{db: db} -} - -func (self stateQuery) GetAccount(addr []byte) *state.StateObject { - return state.NewStateObject(common.BytesToAddress(addr), self.db) -} - func transaction() *types.Transaction { - return types.NewTransactionMessage(common.Address{}, common.Big0, common.Big0, common.Big0, nil) + return types.NewTransactionMessage(common.Address{}, big.NewInt(100), big.NewInt(100), big.NewInt(100), nil) } -func setup() (*TxPool, *ecdsa.PrivateKey) { +func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { + db, _ := ethdb.NewMemDatabase() + statedb := state.New(common.Hash{}, db) + var m event.TypeMux key, _ := crypto.GenerateKey() - return NewTxPool(&m), key + return NewTxPool(&m, func() *state.StateDB { return statedb }), key } -func TestTxAdding(t *testing.T) { - pool, key := setup() - tx1 := transaction() - tx1.SignECDSA(key) - err := pool.Add(tx1) - if err != nil { - t.Error(err) +func TestInvalidTransactions(t *testing.T) { + pool, key := setupTxPool() + + tx := transaction() + tx.SignECDSA(key) + err := pool.Add(tx) + if err != ErrNonExistentAccount { + t.Error("expected", ErrNonExistentAccount) } - err = pool.Add(tx1) - if err == nil { - t.Error("added tx twice") - } -} - -func TestAddInvalidTx(t *testing.T) { - pool, _ := setup() - tx1 := transaction() - err := pool.Add(tx1) - if err == nil { - t.Error("expected error") - } -} - -func TestRemoveSet(t *testing.T) { - pool, _ := setup() - tx1 := transaction() - pool.addTx(tx1) - pool.RemoveSet(types.Transactions{tx1}) - if pool.Size() > 0 { - t.Error("expected pool size to be 0") - } -} - -func TestRemoveInvalid(t *testing.T) { - pool, key := setup() - tx1 := transaction() - pool.addTx(tx1) - pool.RemoveInvalid(SQ()) - if pool.Size() > 0 { - t.Error("expected pool size to be 0") - } - - tx1.SetNonce(1) - tx1.SignECDSA(key) - pool.addTx(tx1) - pool.RemoveInvalid(SQ()) - if pool.Size() != 1 { - t.Error("expected pool size to be 1, is", pool.Size()) - } -} - -func TestInvalidSender(t *testing.T) { - pool, _ := setup() - tx := new(types.Transaction) - tx.R = new(big.Int) - tx.S = new(big.Int) - err := pool.ValidateTransaction(tx) - if err != ErrInvalidSender { - t.Errorf("expected %v, got %v", ErrInvalidSender, err) + from, _ := tx.From() + pool.currentState().AddBalance(from, big.NewInt(1)) + err = pool.Add(tx) + if err != ErrInsufficientFunds { + t.Error("expected", ErrInsufficientFunds) + } + + pool.currentState().AddBalance(from, big.NewInt(100*100)) + err = pool.Add(tx) + if err != ErrIntrinsicGas { + t.Error("expected", ErrIntrinsicGas) + } + + pool.currentState().SetNonce(from, 1) + pool.currentState().AddBalance(from, big.NewInt(0xffffffffffffff)) + tx.GasLimit = big.NewInt(100000) + tx.Price = big.NewInt(1) + tx.SignECDSA(key) + + err = pool.Add(tx) + if err != ErrImpossibleNonce { + t.Error("expected", ErrImpossibleNonce) } } diff --git a/eth/backend.go b/eth/backend.go index 317ee7373..327a5c7f8 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -195,7 +195,7 @@ func New(config *Config) (*Ethereum, error) { eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) eth.pow = ethash.New(eth.chainManager) - eth.txPool = core.NewTxPool(eth.EventMux()) + eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) eth.whisper = whisper.New() diff --git a/miner/worker.go b/miner/worker.go index 25ea95347..b74b67552 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -288,7 +288,7 @@ gasLimit: tcount++ } } - self.eth.TxPool().InvalidateSet(remove) + //self.eth.TxPool().InvalidateSet(remove) var ( uncles []*types.Header