From fa4cbad315609e41d88c59ecbce7c6c6169fc57a Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 19 Feb 2015 22:33:22 +0100 Subject: [PATCH] Optimisations and fixed a couple of DDOS issues in the miner --- cmd/mist/assets/examples/bomb.html | 22 ++++++++++++++ cmd/mist/gui.go | 20 ++----------- core/block_processor.go | 25 +++++++++------- core/chain_manager.go | 25 ++++++++++++---- core/error.go | 17 ----------- core/events.go | 3 ++ core/state_transition.go | 3 +- core/transaction_pool.go | 33 +++++++++++++++------ ethutil/common.go | 7 +++++ miner/worker.go | 22 ++++++++------ pow/ezp/pow.go | 2 +- rpc/packages.go | 1 + state/state_object.go | 4 +++ state/statedb.go | 47 +++++++++++++++++------------- xeth/xeth.go | 29 +++++++++--------- 15 files changed, 156 insertions(+), 104 deletions(-) create mode 100644 cmd/mist/assets/examples/bomb.html diff --git a/cmd/mist/assets/examples/bomb.html b/cmd/mist/assets/examples/bomb.html new file mode 100644 index 000000000..62540f9bb --- /dev/null +++ b/cmd/mist/assets/examples/bomb.html @@ -0,0 +1,22 @@ + + + + + + + + + + + + diff --git a/cmd/mist/gui.go b/cmd/mist/gui.go index 1e3efd269..afddeb8f6 100644 --- a/cmd/mist/gui.go +++ b/cmd/mist/gui.go @@ -387,14 +387,11 @@ func (gui *Gui) update() { generalUpdateTicker := time.NewTicker(500 * time.Millisecond) statsUpdateTicker := time.NewTicker(5 * time.Second) - state := gui.eth.ChainManager().TransState() - - gui.win.Root().Call("setWalletValue", fmt.Sprintf("%v", ethutil.CurrencyToString(state.GetAccount(gui.address()).Balance()))) - lastBlockLabel := gui.getObjectByName("lastBlockLabel") miningLabel := gui.getObjectByName("miningLabel") events := gui.eth.EventMux().Subscribe( + core.ChainEvent{}, core.TxPreEvent{}, core.TxPostEvent{}, ) @@ -407,6 +404,8 @@ func (gui *Gui) update() { return } switch ev := ev.(type) { + case core.ChainEvent: + gui.processBlock(ev.Block, false) case core.TxPreEvent: gui.insertTransaction("pre", ev.Tx) @@ -422,19 +421,6 @@ func (gui *Gui) update() { lastBlockLabel.Set("text", statusText) miningLabel.Set("text", "Mining @ "+strconv.FormatInt(gui.uiLib.Miner().HashRate(), 10)+"/Khash") - /* - blockLength := gui.eth.BlockPool().BlocksProcessed - chainLength := gui.eth.BlockPool().ChainLength - - var ( - pct float64 = 1.0 / float64(chainLength) * float64(blockLength) - dlWidget = gui.win.Root().ObjectByName("downloadIndicator") - dlLabel = gui.win.Root().ObjectByName("downloadLabel") - ) - dlWidget.Set("value", pct) - dlLabel.Set("text", fmt.Sprintf("%d / %d", blockLength, chainLength)) - */ - case <-statsUpdateTicker.C: gui.setStatsPane() } diff --git a/core/block_processor.go b/core/block_processor.go index b4449100f..a9795385f 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -73,24 +73,27 @@ func (sm *BlockProcessor) TransitionState(statedb *state.StateDB, parent, block return receipts, nil } -func (self *BlockProcessor) ApplyTransaction(coinbase *state.StateObject, state *state.StateDB, block *types.Block, tx *types.Transaction, usedGas *big.Int, transientProcess bool) (*types.Receipt, *big.Int, error) { +func (self *BlockProcessor) ApplyTransaction(coinbase *state.StateObject, statedb *state.StateDB, block *types.Block, tx *types.Transaction, usedGas *big.Int, transientProcess bool) (*types.Receipt, *big.Int, error) { // If we are mining this block and validating we want to set the logs back to 0 - state.EmptyLogs() + statedb.EmptyLogs() txGas := new(big.Int).Set(tx.Gas()) - cb := state.GetStateObject(coinbase.Address()) - st := NewStateTransition(NewEnv(state, self.bc, tx, block), tx, cb) + cb := statedb.GetStateObject(coinbase.Address()) + st := NewStateTransition(NewEnv(statedb, self.bc, tx, block), tx, cb) _, err := st.TransitionState() + if err != nil && (IsNonceErr(err) || state.IsGasLimitErr(err)) { + return nil, nil, err + } txGas.Sub(txGas, st.gas) // Update the state with pending changes - state.Update(txGas) + statedb.Update(txGas) cumulative := new(big.Int).Set(usedGas.Add(usedGas, txGas)) - receipt := types.NewReceipt(state.Root(), cumulative) - receipt.SetLogs(state.Logs()) + receipt := types.NewReceipt(statedb.Root(), cumulative) + receipt.SetLogs(statedb.Logs()) receipt.Bloom = types.CreateBloom(types.Receipts{receipt}) chainlogger.Debugln(receipt) @@ -99,12 +102,12 @@ func (self *BlockProcessor) ApplyTransaction(coinbase *state.StateObject, state go self.eventMux.Post(TxPostEvent{tx}) } - go self.eventMux.Post(state.Logs()) + go self.eventMux.Post(statedb.Logs()) return receipt, txGas, err } -func (self *BlockProcessor) ApplyTransactions(coinbase *state.StateObject, state *state.StateDB, block *types.Block, txs types.Transactions, transientProcess bool) (types.Receipts, types.Transactions, types.Transactions, types.Transactions, error) { +func (self *BlockProcessor) ApplyTransactions(coinbase *state.StateObject, statedb *state.StateDB, block *types.Block, txs types.Transactions, transientProcess bool) (types.Receipts, types.Transactions, types.Transactions, types.Transactions, error) { var ( receipts types.Receipts handled, unhandled types.Transactions @@ -115,12 +118,12 @@ func (self *BlockProcessor) ApplyTransactions(coinbase *state.StateObject, state ) for _, tx := range txs { - receipt, txGas, err := self.ApplyTransaction(coinbase, state, block, tx, totalUsedGas, transientProcess) + receipt, txGas, err := self.ApplyTransaction(coinbase, statedb, block, tx, totalUsedGas, transientProcess) if err != nil { switch { case IsNonceErr(err): return nil, nil, nil, nil, err - case IsGasLimitErr(err): + case state.IsGasLimitErr(err): return nil, nil, nil, nil, err default: statelogger.Infoln(err) diff --git a/core/chain_manager.go b/core/chain_manager.go index 286282064..003781791 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -134,14 +134,11 @@ func (self *ChainManager) State() *state.StateDB { func (self *ChainManager) TransState() *state.StateDB { self.tsmu.RLock() defer self.tsmu.RUnlock() - //tmp := self.transState return self.transState } func (self *ChainManager) setTransState(statedb *state.StateDB) { - self.tsmu.Lock() - defer self.tsmu.Unlock() self.transState = statedb } @@ -361,6 +358,9 @@ func (bc *ChainManager) Stop() { } func (self *ChainManager) InsertChain(chain types.Blocks) error { + self.tsmu.Lock() + defer self.tsmu.Unlock() + for _, block := range chain { td, err := self.processor.Process(block) if err != nil { @@ -376,6 +376,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { } block.Td = td + var chain, split bool self.mu.Lock() { self.write(block) @@ -383,16 +384,26 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { if td.Cmp(self.td) > 0 { if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 { chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td) + split = true } self.setTotalDifficulty(td) self.insert(block) - self.setTransState(state.New(cblock.Root(), self.db)) - self.eventMux.Post(ChainEvent{block, td}) + chain = true } } self.mu.Unlock() + + if chain { + //self.setTransState(state.New(block.Root(), self.db)) + self.eventMux.Post(ChainEvent{block, td}) + } + + if split { + self.setTransState(state.New(block.Root(), self.db)) + self.eventMux.Post(ChainSplitEvent{block}) + } } return nil @@ -402,3 +413,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { func (self *ChainManager) GetAccount(addr []byte) *state.StateObject { return self.State().GetAccount(addr) } + +func (self *ChainManager) TransMut() *sync.RWMutex { + return &self.tsmu +} diff --git a/core/error.go b/core/error.go index 6af48ac2d..e86bacb2d 100644 --- a/core/error.go +++ b/core/error.go @@ -68,23 +68,6 @@ func IsValidationErr(err error) bool { return ok } -type GasLimitErr struct { - Message string - Is, Max *big.Int -} - -func IsGasLimitErr(err error) bool { - _, ok := err.(*GasLimitErr) - - return ok -} -func (err *GasLimitErr) Error() string { - return err.Message -} -func GasLimitError(is, max *big.Int) *GasLimitErr { - return &GasLimitErr{Message: fmt.Sprintf("GasLimit error. Max %s, transaction would take it to %s", max, is), Is: is, Max: max} -} - type NonceErr struct { Message string Is, Exp uint64 diff --git a/core/events.go b/core/events.go index fe106da49..4cbbc609c 100644 --- a/core/events.go +++ b/core/events.go @@ -13,3 +13,6 @@ type NewBlockEvent struct{ Block *types.Block } // NewMinedBlockEvent is posted when a block has been imported. type NewMinedBlockEvent struct{ Block *types.Block } + +// ChainSplit is posted when a new head is detected +type ChainSplitEvent struct{ Block *types.Block } diff --git a/core/state_transition.go b/core/state_transition.go index 33dd45f02..e82be647d 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -166,7 +166,8 @@ func (self *StateTransition) TransitionState() (ret []byte, err error) { defer self.RefundGas() // Increment the nonce for the next transaction - sender.Nonce += 1 + self.state.SetNonce(sender.Address(), sender.Nonce+1) + //sender.Nonce += 1 // Transaction gas if err = self.UseGas(vm.GasTx); err != nil { diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 7a901fcae..894b6c440 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -3,6 +3,7 @@ package core import ( "errors" "fmt" + "sync" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethutil" @@ -35,6 +36,7 @@ type TxProcessor interface { // guarantee a non blocking pool we use a queue channel which can be // independently read without needing access to the actual pool. type TxPool struct { + mu sync.RWMutex // Queueing channel for reading and writing incoming // transactions to queueChan chan *types.Transaction @@ -97,7 +99,7 @@ func (self *TxPool) addTx(tx *types.Transaction) { self.txs[string(tx.Hash())] = tx } -func (self *TxPool) Add(tx *types.Transaction) error { +func (self *TxPool) add(tx *types.Transaction) error { if self.txs[string(tx.Hash())] != nil { return fmt.Errorf("Known transaction (%x)", tx.Hash()[0:4]) } @@ -128,17 +130,28 @@ func (self *TxPool) Size() int { return len(self.txs) } +func (self *TxPool) Add(tx *types.Transaction) error { + self.mu.Lock() + defer self.mu.Unlock() + return self.add(tx) +} func (self *TxPool) AddTransactions(txs []*types.Transaction) { + self.mu.Lock() + defer self.mu.Unlock() + for _, tx := range txs { - if err := self.Add(tx); err != nil { - txplogger.Infoln(err) + if err := self.add(tx); err != nil { + txplogger.Debugln(err) } else { - txplogger.Infof("tx %x\n", tx.Hash()[0:4]) + txplogger.Debugf("tx %x\n", tx.Hash()[0:4]) } } } func (self *TxPool) GetTransactions() (txs types.Transactions) { + self.mu.RLock() + defer self.mu.RUnlock() + txs = make(types.Transactions, self.Size()) i := 0 for _, tx := range self.txs { @@ -150,30 +163,32 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { } func (pool *TxPool) RemoveInvalid(query StateQuery) { + pool.mu.Lock() + var removedTxs types.Transactions for _, tx := range pool.txs { sender := query.GetAccount(tx.From()) err := pool.ValidateTransaction(tx) - fmt.Println(err, sender.Nonce, tx.Nonce()) if err != nil || sender.Nonce >= tx.Nonce() { removedTxs = append(removedTxs, tx) } } + pool.mu.Unlock() pool.RemoveSet(removedTxs) } func (self *TxPool) RemoveSet(txs types.Transactions) { + self.mu.Lock() + defer self.mu.Unlock() + for _, tx := range txs { delete(self.txs, string(tx.Hash())) } } -func (pool *TxPool) Flush() []*types.Transaction { - txList := pool.GetTransactions() +func (pool *TxPool) Flush() { pool.txs = make(map[string]*types.Transaction) - - return txList } func (pool *TxPool) Start() { diff --git a/ethutil/common.go b/ethutil/common.go index 271c56fd5..2ef2440c7 100644 --- a/ethutil/common.go +++ b/ethutil/common.go @@ -4,6 +4,7 @@ import ( "fmt" "math/big" "runtime" + "time" ) func IsWindows() bool { @@ -86,3 +87,9 @@ var ( Big256 = big.NewInt(0xff) Big257 = big.NewInt(257) ) + +func Bench(pre string, cb func()) { + start := time.Now() + cb() + fmt.Println(pre, ": took:", time.Since(start)) +} diff --git a/miner/worker.go b/miner/worker.go index 47b462e53..1f3a52ab5 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -109,14 +109,18 @@ func (self *worker) register(agent Agent) { } func (self *worker) update() { - events := self.mux.Subscribe(core.ChainEvent{}, core.TxPreEvent{}) + events := self.mux.Subscribe(core.ChainEvent{}, core.NewMinedBlockEvent{}) out: for { select { case event := <-events.Chan(): - switch event.(type) { - case core.ChainEvent, core.TxPreEvent: + switch ev := event.(type) { + case core.ChainEvent: + if self.current.block != ev.Block { + self.commitNewWork() + } + case core.NewMinedBlockEvent: self.commitNewWork() } case <-self.quit: @@ -172,17 +176,19 @@ func (self *worker) commitNewWork() { transactions := self.eth.TxPool().GetTransactions() sort.Sort(types.TxByNonce{transactions}) + minerlogger.Infof("committing new work with %d txs\n", len(transactions)) // Keep track of transactions which return errors so they can be removed var remove types.Transactions +gasLimit: for _, tx := range transactions { err := self.commitTransaction(tx) switch { case core.IsNonceErr(err): // Remove invalid transactions remove = append(remove, tx) - case core.IsGasLimitErr(err): + case state.IsGasLimitErr(err): // Break on gas limit - break + break gasLimit } if err != nil { @@ -227,11 +233,9 @@ func (self *worker) commitUncle(uncle *types.Header) error { } func (self *worker) commitTransaction(tx *types.Transaction) error { - snapshot := self.current.state.Copy() + //fmt.Printf("proc %x %v\n", tx.Hash()[:3], tx.Nonce()) receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true) - if err != nil && (core.IsNonceErr(err) || core.IsGasLimitErr(err)) { - self.current.state.Set(snapshot) - + if err != nil && (core.IsNonceErr(err) || state.IsGasLimitErr(err)) { return err } diff --git a/pow/ezp/pow.go b/pow/ezp/pow.go index 540381243..f4a8b80e5 100644 --- a/pow/ezp/pow.go +++ b/pow/ezp/pow.go @@ -21,7 +21,7 @@ type EasyPow struct { } func New() *EasyPow { - return &EasyPow{turbo: false} + return &EasyPow{turbo: true} } func (pow *EasyPow) GetHashrate() int64 { diff --git a/rpc/packages.go b/rpc/packages.go index 7044d9d12..63eea54d6 100644 --- a/rpc/packages.go +++ b/rpc/packages.go @@ -180,6 +180,7 @@ func (p *EthereumApi) Transact(args *NewTxArgs, reply *interface{}) error { result, _ := p.xeth.Transact( /* TODO specify account */ args.To, args.Value, args.Gas, args.GasPrice, args.Data) *reply = result } + return nil } diff --git a/state/state_object.go b/state/state_object.go index eaa91c713..d50c9fd7a 100644 --- a/state/state_object.go +++ b/state/state_object.go @@ -53,6 +53,7 @@ type StateObject struct { // When an object is marked for deletion it will be delete from the trie // during the "update" phase of the state transition remove bool + dirty bool } func (self *StateObject) Reset() { @@ -211,6 +212,8 @@ func (self *StateObject) BuyGas(gas, price *big.Int) error { return GasLimitError(self.gasPool, gas) } + self.gasPool.Sub(self.gasPool, gas) + rGas := new(big.Int).Set(gas) rGas.Mul(rGas, price) @@ -241,6 +244,7 @@ func (self *StateObject) Copy() *StateObject { stateObject.storage = self.storage.Copy() stateObject.gasPool.Set(self.gasPool) stateObject.remove = self.remove + stateObject.dirty = self.dirty return stateObject } diff --git a/state/statedb.go b/state/statedb.go index c83d59ed7..8c8a21db9 100644 --- a/state/statedb.go +++ b/state/statedb.go @@ -78,13 +78,6 @@ func (self *StateDB) GetNonce(addr []byte) uint64 { return 0 } -func (self *StateDB) SetNonce(addr []byte, nonce uint64) { - stateObject := self.GetStateObject(addr) - if stateObject != nil { - stateObject.Nonce = nonce - } -} - func (self *StateDB) GetCode(addr []byte) []byte { stateObject := self.GetStateObject(addr) if stateObject != nil { @@ -94,14 +87,6 @@ func (self *StateDB) GetCode(addr []byte) []byte { return nil } -func (self *StateDB) SetCode(addr, code []byte) { - stateObject := self.GetStateObject(addr) - if stateObject != nil { - stateObject.SetCode(code) - } -} - -// TODO vars func (self *StateDB) GetState(a, b []byte) []byte { stateObject := self.GetStateObject(a) if stateObject != nil { @@ -111,10 +96,27 @@ func (self *StateDB) GetState(a, b []byte) []byte { return nil } +func (self *StateDB) SetNonce(addr []byte, nonce uint64) { + stateObject := self.GetStateObject(addr) + if stateObject != nil { + stateObject.Nonce = nonce + stateObject.dirty = true + } +} + +func (self *StateDB) SetCode(addr, code []byte) { + stateObject := self.GetStateObject(addr) + if stateObject != nil { + stateObject.SetCode(code) + stateObject.dirty = true + } +} + func (self *StateDB) SetState(addr, key []byte, value interface{}) { stateObject := self.GetStateObject(addr) if stateObject != nil { stateObject.SetState(key, ethutil.NewValue(value)) + stateObject.dirty = true } } @@ -122,6 +124,7 @@ func (self *StateDB) Delete(addr []byte) bool { stateObject := self.GetStateObject(addr) if stateObject != nil { stateObject.MarkForDeletion() + stateObject.dirty = true return true } @@ -282,16 +285,18 @@ func (self *StateDB) Refunds() map[string]*big.Int { } func (self *StateDB) Update(gasUsed *big.Int) { - self.refund = make(map[string]*big.Int) for _, stateObject := range self.stateObjects { - if stateObject.remove { - self.DeleteStateObject(stateObject) - } else { - stateObject.Sync() + if stateObject.dirty { + if stateObject.remove { + self.DeleteStateObject(stateObject) + } else { + stateObject.Sync() - self.UpdateStateObject(stateObject) + self.UpdateStateObject(stateObject) + } + stateObject.dirty = false } } } diff --git a/xeth/xeth.go b/xeth/xeth.go index f005105bb..d578c03c9 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -7,6 +7,7 @@ package xeth import ( "bytes" "encoding/json" + "fmt" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -241,7 +242,6 @@ func (self *XEth) Call(toStr, valueStr, gasStr, gasPriceStr, dataStr string) (st } func (self *XEth) Transact(toStr, valueStr, gasStr, gasPriceStr, codeStr string) (string, error) { - var ( to []byte value = ethutil.NewValue(valueStr) @@ -265,29 +265,32 @@ func (self *XEth) Transact(toStr, valueStr, gasStr, gasPriceStr, codeStr string) tx = types.NewTransactionMessage(to, value.BigInt(), gas.BigInt(), price.BigInt(), data) } - state := self.chainManager.TransState() + var err error + state := self.eth.ChainManager().TransState() + if balance := state.GetBalance(key.Address()); balance.Cmp(tx.Value()) < 0 { + return "", fmt.Errorf("insufficient balance. balance=%v tx=%v", balance, tx.Value()) + } nonce := state.GetNonce(key.Address()) tx.SetNonce(nonce) tx.Sign(key.PrivateKey) - // Do some pre processing for our "pre" events and hooks - block := self.chainManager.NewBlock(key.Address()) - coinbase := state.GetOrNewStateObject(key.Address()) - coinbase.SetGasPool(block.GasLimit()) - self.blockProcessor.ApplyTransactions(coinbase, state, block, types.Transactions{tx}, true) + //fmt.Printf("create tx: %x %v\n", tx.Hash()[:4], tx.Nonce()) - err := self.eth.TxPool().Add(tx) + /* + // Do some pre processing for our "pre" events and hooks + block := self.chainManager.NewBlock(key.Address()) + coinbase := state.GetOrNewStateObject(key.Address()) + coinbase.SetGasPool(block.GasLimit()) + self.blockProcessor.ApplyTransactions(coinbase, state, block, types.Transactions{tx}, true) + */ + + err = self.eth.TxPool().Add(tx) if err != nil { return "", err } state.SetNonce(key.Address(), nonce+1) - if contractCreation { - addr := core.AddressFromMessage(tx) - pipelogger.Infof("Contract addr %x\n", addr) - } - if types.IsContractAddr(to) { return toHex(core.AddressFromMessage(tx)), nil }