core: metrics collection for transaction events (#3157)

* core: Add metrics collection for transaction events; replace/discard for pending and future queues, as well as invalid transactions

* core: change namespace for txpool metrics

* core: define more metrics (not yet used)

* core: implement more tx metrics for when transactions are dropped

* core: minor formatting tweeks (will squash later)

* core: remove superfluous meter, fix missing pending nofunds

* core, metrics: switch txpool meters to counters
This commit is contained in:
Martin Holst Swende 2016-11-01 13:46:11 +01:00 committed by Péter Szilágyi
parent f4d878f3d8
commit 36956da4d2
2 changed files with 39 additions and 0 deletions

View File

@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque" "gopkg.in/karalabe/cookiejar.v2/collections/prque"
) )
@ -55,6 +56,23 @@ var (
evictionInterval = time.Minute // Time interval to check for evictable transactions evictionInterval = time.Minute // Time interval to check for evictable transactions
) )
var (
// Metrics for the pending pool
pendingDiscardCounter = metrics.NewCounter("txpool/pending/discard")
pendingReplaceCounter = metrics.NewCounter("txpool/pending/replace")
pendingRLCounter = metrics.NewCounter("txpool/pending/ratelimit") // Dropped due to rate limiting
pendingNofundsCounter = metrics.NewCounter("txpool/pending/nofunds") // Dropped due to out-of-funds
// Metrics for the queued pool
queuedDiscardCounter = metrics.NewCounter("txpool/queued/discard")
queuedReplaceCounter = metrics.NewCounter("txpool/queued/replace")
queuedRLCounter = metrics.NewCounter("txpool/queued/ratelimit") // Dropped due to rate limiting
queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds") // Dropped due to out-of-funds
// General tx metrics
invalidTxCounter = metrics.NewCounter("txpool/invalid")
)
type stateFn func() (*state.StateDB, error) type stateFn func() (*state.StateDB, error)
// TxPool contains all currently known transactions. Transactions // TxPool contains all currently known transactions. Transactions
@ -306,6 +324,7 @@ func (pool *TxPool) add(tx *types.Transaction) error {
} }
// Otherwise ensure basic validation passes and queue it up // Otherwise ensure basic validation passes and queue it up
if err := pool.validateTx(tx); err != nil { if err := pool.validateTx(tx); err != nil {
invalidTxCounter.Inc(1)
return err return err
} }
pool.enqueueTx(hash, tx) pool.enqueueTx(hash, tx)
@ -333,11 +352,13 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) {
} }
inserted, old := pool.queue[from].Add(tx) inserted, old := pool.queue[from].Add(tx)
if !inserted { if !inserted {
queuedDiscardCounter.Inc(1)
return // An older transaction was better, discard this return // An older transaction was better, discard this
} }
// Discard any previous transaction and mark this // Discard any previous transaction and mark this
if old != nil { if old != nil {
delete(pool.all, old.Hash()) delete(pool.all, old.Hash())
queuedReplaceCounter.Inc(1)
} }
pool.all[hash] = tx pool.all[hash] = tx
} }
@ -360,11 +381,13 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
if !inserted { if !inserted {
// An older transaction was better, discard this // An older transaction was better, discard this
delete(pool.all, hash) delete(pool.all, hash)
pendingDiscardCounter.Inc(1)
return return
} }
// Otherwise discard any previous transaction and mark this // Otherwise discard any previous transaction and mark this
if old != nil { if old != nil {
delete(pool.all, old.Hash()) delete(pool.all, old.Hash())
pendingReplaceCounter.Inc(1)
} }
pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests) pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests)
@ -499,6 +522,7 @@ func (pool *TxPool) promoteExecutables() {
glog.Infof("Removed unpayable queued transaction: %v", tx) glog.Infof("Removed unpayable queued transaction: %v", tx)
} }
delete(pool.all, tx.Hash()) delete(pool.all, tx.Hash())
queuedNofundsCounter.Inc(1)
} }
// Gather all executable transactions and promote them // Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
@ -513,6 +537,7 @@ func (pool *TxPool) promoteExecutables() {
glog.Infof("Removed cap-exceeding queued transaction: %v", tx) glog.Infof("Removed cap-exceeding queued transaction: %v", tx)
} }
delete(pool.all, tx.Hash()) delete(pool.all, tx.Hash())
queuedRLCounter.Inc(1)
} }
queued += uint64(list.Len()) queued += uint64(list.Len())
@ -527,6 +552,7 @@ func (pool *TxPool) promoteExecutables() {
pending += uint64(list.Len()) pending += uint64(list.Len())
} }
if pending > maxPendingTotal { if pending > maxPendingTotal {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first // Assemble a spam order to penalize large transactors first
spammers := prque.New() spammers := prque.New()
for addr, list := range pool.pending { for addr, list := range pool.pending {
@ -573,6 +599,7 @@ func (pool *TxPool) promoteExecutables() {
} }
} }
} }
pendingRLCounter.Inc(int64(pendingBeforeCap - pending))
} }
// If we've queued more transactions than the hard limit, drop oldest ones // If we've queued more transactions than the hard limit, drop oldest ones
if queued > maxQueuedInTotal { if queued > maxQueuedInTotal {
@ -596,6 +623,7 @@ func (pool *TxPool) promoteExecutables() {
pool.removeTx(tx.Hash()) pool.removeTx(tx.Hash())
} }
drop -= size drop -= size
queuedRLCounter.Inc(int64(size))
continue continue
} }
// Otherwise drop only last few transactions // Otherwise drop only last few transactions
@ -603,6 +631,7 @@ func (pool *TxPool) promoteExecutables() {
for i := len(txs) - 1; i >= 0 && drop > 0; i-- { for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash()) pool.removeTx(txs[i].Hash())
drop-- drop--
queuedRLCounter.Inc(1)
} }
} }
} }
@ -636,6 +665,7 @@ func (pool *TxPool) demoteUnexecutables() {
glog.Infof("Removed unpayable pending transaction: %v", tx) glog.Infof("Removed unpayable pending transaction: %v", tx)
} }
delete(pool.all, tx.Hash()) delete(pool.all, tx.Hash())
pendingNofundsCounter.Inc(1)
} }
for _, tx := range invalids { for _, tx := range invalids {
if glog.V(logger.Core) { if glog.V(logger.Core) {

View File

@ -48,6 +48,15 @@ func init() {
exp.Exp(metrics.DefaultRegistry) exp.Exp(metrics.DefaultRegistry)
} }
// NewCounter create a new metrics Counter, either a real one of a NOP stub depending
// on the metrics flag.
func NewCounter(name string) metrics.Counter {
if !Enabled {
return new(metrics.NilCounter)
}
return metrics.GetOrRegisterCounter(name, metrics.DefaultRegistry)
}
// NewMeter create a new metrics Meter, either a real one of a NOP stub depending // NewMeter create a new metrics Meter, either a real one of a NOP stub depending
// on the metrics flag. // on the metrics flag.
func NewMeter(name string) metrics.Meter { func NewMeter(name string) metrics.Meter {