eth, xeth: fix GasPriceOracle goroutine leak

XEth.gpo was being initialized as needed. WithState copies the XEth
struct including the gpo field. If gpo was nil at the time of the copy
and Call or Transact were invoked on it, an additional GPO listenLoop
would be spawned.

Move the lazy initialization to GasPriceOracle instead so the same GPO
instance is shared among all created XEths.

Fixes #1317
Might help with #1930
This commit is contained in:
Felix Lange 2015-10-26 21:42:24 +01:00
parent 77878f76a9
commit ae1b5b3ff2
2 changed files with 75 additions and 73 deletions

View File

@ -23,49 +23,66 @@ import (
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
) )
const gpoProcessPastBlocks = 100 const (
gpoProcessPastBlocks = 100
// for testing
gpoDefaultBaseCorrectionFactor = 110
gpoDefaultMinGasPrice = 10000000000000
)
type blockPriceInfo struct { type blockPriceInfo struct {
baseGasPrice *big.Int baseGasPrice *big.Int
} }
// GasPriceOracle recommends gas prices based on the content of recent
// blocks.
type GasPriceOracle struct { type GasPriceOracle struct {
eth *Ethereum eth *Ethereum
chain *core.BlockChain initOnce sync.Once
events event.Subscription minPrice *big.Int
lastBaseMutex sync.Mutex
lastBase *big.Int
// state of listenLoop
blocks map[uint64]*blockPriceInfo blocks map[uint64]*blockPriceInfo
firstProcessed, lastProcessed uint64 firstProcessed, lastProcessed uint64
lastBaseMutex sync.Mutex minBase *big.Int
lastBase, minBase *big.Int
} }
func NewGasPriceOracle(eth *Ethereum) (self *GasPriceOracle) { // NewGasPriceOracle returns a new oracle.
self = &GasPriceOracle{} func NewGasPriceOracle(eth *Ethereum) *GasPriceOracle {
self.blocks = make(map[uint64]*blockPriceInfo) minprice := eth.GpoMinGasPrice
self.eth = eth if minprice == nil {
self.chain = eth.blockchain minprice = big.NewInt(gpoDefaultMinGasPrice)
self.events = eth.EventMux().Subscribe( }
core.ChainEvent{}, minbase := new(big.Int).Mul(minprice, big.NewInt(100))
core.ChainSplitEvent{}, if eth.GpobaseCorrectionFactor > 0 {
) minbase = minbase.Div(minbase, big.NewInt(int64(eth.GpobaseCorrectionFactor)))
}
minbase := new(big.Int).Mul(self.eth.GpoMinGasPrice, big.NewInt(100)) return &GasPriceOracle{
minbase = minbase.Div(minbase, big.NewInt(int64(self.eth.GpobaseCorrectionFactor))) eth: eth,
self.minBase = minbase blocks: make(map[uint64]*blockPriceInfo),
minBase: minbase,
self.processPastBlocks() minPrice: minprice,
go self.listenLoop() lastBase: minprice,
return }
} }
func (self *GasPriceOracle) processPastBlocks() { func (gpo *GasPriceOracle) init() {
gpo.initOnce.Do(func() {
gpo.processPastBlocks(gpo.eth.BlockChain())
go gpo.listenLoop()
})
}
func (self *GasPriceOracle) processPastBlocks(chain *core.BlockChain) {
last := int64(-1) last := int64(-1)
cblock := self.chain.CurrentBlock() cblock := chain.CurrentBlock()
if cblock != nil { if cblock != nil {
last = int64(cblock.NumberU64()) last = int64(cblock.NumberU64())
} }
@ -75,7 +92,7 @@ func (self *GasPriceOracle) processPastBlocks() {
} }
self.firstProcessed = uint64(first) self.firstProcessed = uint64(first)
for i := first; i <= last; i++ { for i := first; i <= last; i++ {
block := self.chain.GetBlockByNumber(uint64(i)) block := chain.GetBlockByNumber(uint64(i))
if block != nil { if block != nil {
self.processBlock(block) self.processBlock(block)
} }
@ -84,9 +101,10 @@ func (self *GasPriceOracle) processPastBlocks() {
} }
func (self *GasPriceOracle) listenLoop() { func (self *GasPriceOracle) listenLoop() {
defer self.events.Unsubscribe() events := self.eth.EventMux().Subscribe(core.ChainEvent{}, core.ChainSplitEvent{})
defer events.Unsubscribe()
for event := range self.events.Chan() { for event := range events.Chan() {
switch event := event.Data.(type) { switch event := event.Data.(type) {
case core.ChainEvent: case core.ChainEvent:
self.processBlock(event.Block) self.processBlock(event.Block)
@ -102,7 +120,7 @@ func (self *GasPriceOracle) processBlock(block *types.Block) {
self.lastProcessed = i self.lastProcessed = i
} }
lastBase := self.eth.GpoMinGasPrice lastBase := self.minPrice
bpl := self.blocks[i-1] bpl := self.blocks[i-1]
if bpl != nil { if bpl != nil {
lastBase = bpl.baseGasPrice lastBase = bpl.baseGasPrice
@ -176,28 +194,19 @@ func (self *GasPriceOracle) lowestPrice(block *types.Block) *big.Int {
return minPrice return minPrice
} }
// SuggestPrice returns the recommended gas price.
func (self *GasPriceOracle) SuggestPrice() *big.Int { func (self *GasPriceOracle) SuggestPrice() *big.Int {
self.init()
self.lastBaseMutex.Lock() self.lastBaseMutex.Lock()
base := self.lastBase price := new(big.Int).Set(self.lastBase)
self.lastBaseMutex.Unlock() self.lastBaseMutex.Unlock()
if base == nil { price.Mul(price, big.NewInt(int64(self.eth.GpobaseCorrectionFactor)))
base = self.eth.GpoMinGasPrice price.Div(price, big.NewInt(100))
if price.Cmp(self.minPrice) < 0 {
price.Set(self.minPrice)
} else if self.eth.GpoMaxGasPrice != nil && price.Cmp(self.eth.GpoMaxGasPrice) > 0 {
price.Set(self.eth.GpoMaxGasPrice)
} }
if base == nil { return price
return big.NewInt(10000000000000) // apparently MinGasPrice is not initialized during some tests
}
baseCorr := new(big.Int).Mul(base, big.NewInt(int64(self.eth.GpobaseCorrectionFactor)))
baseCorr.Div(baseCorr, big.NewInt(100))
if baseCorr.Cmp(self.eth.GpoMinGasPrice) < 0 {
return self.eth.GpoMinGasPrice
}
if baseCorr.Cmp(self.eth.GpoMaxGasPrice) > 0 {
return self.eth.GpoMaxGasPrice
}
return baseCorr
} }

View File

@ -59,24 +59,8 @@ const (
LogFilterTy LogFilterTy
) )
func DefaultGas() *big.Int { return new(big.Int).Set(defaultGas) }
func (self *XEth) DefaultGasPrice() *big.Int {
if self.gpo == nil {
self.gpo = eth.NewGasPriceOracle(self.backend)
}
return self.gpo.SuggestPrice()
}
type XEth struct { type XEth struct {
backend *eth.Ethereum quit chan struct{}
frontend Frontend
state *State
whisper *Whisper
quit chan struct{}
filterManager *filters.FilterSystem
logMu sync.RWMutex logMu sync.RWMutex
logQueue map[int]*logQueue logQueue map[int]*logQueue
@ -92,16 +76,18 @@ type XEth struct {
transactMu sync.Mutex transactMu sync.Mutex
agent *miner.RemoteAgent // read-only fields
backend *eth.Ethereum
gpo *eth.GasPriceOracle frontend Frontend
agent *miner.RemoteAgent
gpo *eth.GasPriceOracle
state *State
whisper *Whisper
filterManager *filters.FilterSystem
} }
func NewTest(eth *eth.Ethereum, frontend Frontend) *XEth { func NewTest(eth *eth.Ethereum, frontend Frontend) *XEth {
return &XEth{ return &XEth{backend: eth, frontend: frontend}
backend: eth,
frontend: frontend,
}
} }
// New creates an XEth that uses the given frontend. // New creates an XEth that uses the given frontend.
@ -118,6 +104,7 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth {
transactionQueue: make(map[int]*hashQueue), transactionQueue: make(map[int]*hashQueue),
messages: make(map[int]*whisperFilter), messages: make(map[int]*whisperFilter),
agent: miner.NewRemoteAgent(), agent: miner.NewRemoteAgent(),
gpo: eth.NewGasPriceOracle(ethereum),
} }
if ethereum.Whisper() != nil { if ethereum.Whisper() != nil {
xeth.whisper = NewWhisper(ethereum.Whisper()) xeth.whisper = NewWhisper(ethereum.Whisper())
@ -207,6 +194,12 @@ func cTopics(t [][]string) [][]common.Hash {
return topics return topics
} }
func DefaultGas() *big.Int { return new(big.Int).Set(defaultGas) }
func (self *XEth) DefaultGasPrice() *big.Int {
return self.gpo.SuggestPrice()
}
func (self *XEth) RemoteMining() *miner.RemoteAgent { return self.agent } func (self *XEth) RemoteMining() *miner.RemoteAgent { return self.agent }
func (self *XEth) AtStateNum(num int64) *XEth { func (self *XEth) AtStateNum(num int64) *XEth {