Merge pull request #1932 from fjl/gpo-defootgunize

eth, xeth: fix GasPriceOracle goroutine leak
This commit is contained in:
Jeffrey Wilcke 2015-10-28 10:32:35 +01:00
commit 2e4fdce743
2 changed files with 75 additions and 73 deletions

View File

@ -23,49 +23,66 @@ import (
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
) )
const gpoProcessPastBlocks = 100 const (
gpoProcessPastBlocks = 100
// for testing
gpoDefaultBaseCorrectionFactor = 110
gpoDefaultMinGasPrice = 10000000000000
)
type blockPriceInfo struct { type blockPriceInfo struct {
baseGasPrice *big.Int baseGasPrice *big.Int
} }
// GasPriceOracle recommends gas prices based on the content of recent
// blocks.
type GasPriceOracle struct { type GasPriceOracle struct {
eth *Ethereum eth *Ethereum
chain *core.BlockChain initOnce sync.Once
events event.Subscription minPrice *big.Int
lastBaseMutex sync.Mutex
lastBase *big.Int
// state of listenLoop
blocks map[uint64]*blockPriceInfo blocks map[uint64]*blockPriceInfo
firstProcessed, lastProcessed uint64 firstProcessed, lastProcessed uint64
lastBaseMutex sync.Mutex minBase *big.Int
lastBase, minBase *big.Int
} }
func NewGasPriceOracle(eth *Ethereum) (self *GasPriceOracle) { // NewGasPriceOracle returns a new oracle.
self = &GasPriceOracle{} func NewGasPriceOracle(eth *Ethereum) *GasPriceOracle {
self.blocks = make(map[uint64]*blockPriceInfo) minprice := eth.GpoMinGasPrice
self.eth = eth if minprice == nil {
self.chain = eth.blockchain minprice = big.NewInt(gpoDefaultMinGasPrice)
self.events = eth.EventMux().Subscribe( }
core.ChainEvent{}, minbase := new(big.Int).Mul(minprice, big.NewInt(100))
core.ChainSplitEvent{}, if eth.GpobaseCorrectionFactor > 0 {
) minbase = minbase.Div(minbase, big.NewInt(int64(eth.GpobaseCorrectionFactor)))
}
minbase := new(big.Int).Mul(self.eth.GpoMinGasPrice, big.NewInt(100)) return &GasPriceOracle{
minbase = minbase.Div(minbase, big.NewInt(int64(self.eth.GpobaseCorrectionFactor))) eth: eth,
self.minBase = minbase blocks: make(map[uint64]*blockPriceInfo),
minBase: minbase,
self.processPastBlocks() minPrice: minprice,
go self.listenLoop() lastBase: minprice,
return }
} }
func (self *GasPriceOracle) processPastBlocks() { func (gpo *GasPriceOracle) init() {
gpo.initOnce.Do(func() {
gpo.processPastBlocks(gpo.eth.BlockChain())
go gpo.listenLoop()
})
}
func (self *GasPriceOracle) processPastBlocks(chain *core.BlockChain) {
last := int64(-1) last := int64(-1)
cblock := self.chain.CurrentBlock() cblock := chain.CurrentBlock()
if cblock != nil { if cblock != nil {
last = int64(cblock.NumberU64()) last = int64(cblock.NumberU64())
} }
@ -75,7 +92,7 @@ func (self *GasPriceOracle) processPastBlocks() {
} }
self.firstProcessed = uint64(first) self.firstProcessed = uint64(first)
for i := first; i <= last; i++ { for i := first; i <= last; i++ {
block := self.chain.GetBlockByNumber(uint64(i)) block := chain.GetBlockByNumber(uint64(i))
if block != nil { if block != nil {
self.processBlock(block) self.processBlock(block)
} }
@ -84,9 +101,10 @@ func (self *GasPriceOracle) processPastBlocks() {
} }
func (self *GasPriceOracle) listenLoop() { func (self *GasPriceOracle) listenLoop() {
defer self.events.Unsubscribe() events := self.eth.EventMux().Subscribe(core.ChainEvent{}, core.ChainSplitEvent{})
defer events.Unsubscribe()
for event := range self.events.Chan() { for event := range events.Chan() {
switch event := event.Data.(type) { switch event := event.Data.(type) {
case core.ChainEvent: case core.ChainEvent:
self.processBlock(event.Block) self.processBlock(event.Block)
@ -102,7 +120,7 @@ func (self *GasPriceOracle) processBlock(block *types.Block) {
self.lastProcessed = i self.lastProcessed = i
} }
lastBase := self.eth.GpoMinGasPrice lastBase := self.minPrice
bpl := self.blocks[i-1] bpl := self.blocks[i-1]
if bpl != nil { if bpl != nil {
lastBase = bpl.baseGasPrice lastBase = bpl.baseGasPrice
@ -176,28 +194,19 @@ func (self *GasPriceOracle) lowestPrice(block *types.Block) *big.Int {
return minPrice return minPrice
} }
// SuggestPrice returns the recommended gas price.
func (self *GasPriceOracle) SuggestPrice() *big.Int { func (self *GasPriceOracle) SuggestPrice() *big.Int {
self.init()
self.lastBaseMutex.Lock() self.lastBaseMutex.Lock()
base := self.lastBase price := new(big.Int).Set(self.lastBase)
self.lastBaseMutex.Unlock() self.lastBaseMutex.Unlock()
if base == nil { price.Mul(price, big.NewInt(int64(self.eth.GpobaseCorrectionFactor)))
base = self.eth.GpoMinGasPrice price.Div(price, big.NewInt(100))
if price.Cmp(self.minPrice) < 0 {
price.Set(self.minPrice)
} else if self.eth.GpoMaxGasPrice != nil && price.Cmp(self.eth.GpoMaxGasPrice) > 0 {
price.Set(self.eth.GpoMaxGasPrice)
} }
if base == nil { return price
return big.NewInt(10000000000000) // apparently MinGasPrice is not initialized during some tests
}
baseCorr := new(big.Int).Mul(base, big.NewInt(int64(self.eth.GpobaseCorrectionFactor)))
baseCorr.Div(baseCorr, big.NewInt(100))
if baseCorr.Cmp(self.eth.GpoMinGasPrice) < 0 {
return self.eth.GpoMinGasPrice
}
if baseCorr.Cmp(self.eth.GpoMaxGasPrice) > 0 {
return self.eth.GpoMaxGasPrice
}
return baseCorr
} }

View File

@ -59,24 +59,8 @@ const (
LogFilterTy LogFilterTy
) )
func DefaultGas() *big.Int { return new(big.Int).Set(defaultGas) }
func (self *XEth) DefaultGasPrice() *big.Int {
if self.gpo == nil {
self.gpo = eth.NewGasPriceOracle(self.backend)
}
return self.gpo.SuggestPrice()
}
type XEth struct { type XEth struct {
backend *eth.Ethereum quit chan struct{}
frontend Frontend
state *State
whisper *Whisper
quit chan struct{}
filterManager *filters.FilterSystem
logMu sync.RWMutex logMu sync.RWMutex
logQueue map[int]*logQueue logQueue map[int]*logQueue
@ -92,16 +76,18 @@ type XEth struct {
transactMu sync.Mutex transactMu sync.Mutex
agent *miner.RemoteAgent // read-only fields
backend *eth.Ethereum
gpo *eth.GasPriceOracle frontend Frontend
agent *miner.RemoteAgent
gpo *eth.GasPriceOracle
state *State
whisper *Whisper
filterManager *filters.FilterSystem
} }
func NewTest(eth *eth.Ethereum, frontend Frontend) *XEth { func NewTest(eth *eth.Ethereum, frontend Frontend) *XEth {
return &XEth{ return &XEth{backend: eth, frontend: frontend}
backend: eth,
frontend: frontend,
}
} }
// New creates an XEth that uses the given frontend. // New creates an XEth that uses the given frontend.
@ -118,6 +104,7 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth {
transactionQueue: make(map[int]*hashQueue), transactionQueue: make(map[int]*hashQueue),
messages: make(map[int]*whisperFilter), messages: make(map[int]*whisperFilter),
agent: miner.NewRemoteAgent(), agent: miner.NewRemoteAgent(),
gpo: eth.NewGasPriceOracle(ethereum),
} }
if ethereum.Whisper() != nil { if ethereum.Whisper() != nil {
xeth.whisper = NewWhisper(ethereum.Whisper()) xeth.whisper = NewWhisper(ethereum.Whisper())
@ -207,6 +194,12 @@ func cTopics(t [][]string) [][]common.Hash {
return topics return topics
} }
func DefaultGas() *big.Int { return new(big.Int).Set(defaultGas) }
func (self *XEth) DefaultGasPrice() *big.Int {
return self.gpo.SuggestPrice()
}
func (self *XEth) RemoteMining() *miner.RemoteAgent { return self.agent } func (self *XEth) RemoteMining() *miner.RemoteAgent { return self.agent }
func (self *XEth) AtStateNum(num int64) *XEth { func (self *XEth) AtStateNum(num int64) *XEth {