From 17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 25 Aug 2015 13:57:49 +0300 Subject: [PATCH] eth: update metrics collection to handle eth/62 algos --- cmd/geth/monitorcmd.go | 2 +- eth/downloader/downloader.go | 44 +++++++++++++++++++++++++++++++---- eth/downloader/metrics.go | 45 ++++++++++++++++++++++++++++++++++++ eth/downloader/queue.go | 12 ++++++++++ eth/fetcher/fetcher.go | 35 ++++++++++++++++++++-------- eth/fetcher/metrics.go | 26 ++++++++++++++++----- eth/metrics.go | 32 ++++++++++++------------- 7 files changed, 160 insertions(+), 36 deletions(-) create mode 100644 eth/downloader/metrics.go diff --git a/cmd/geth/monitorcmd.go b/cmd/geth/monitorcmd.go index a7c099532..a45d29b8f 100644 --- a/cmd/geth/monitorcmd.go +++ b/cmd/geth/monitorcmd.go @@ -289,7 +289,7 @@ func updateChart(metric string, data []float64, base *int, chart *termui.LineCha } } unit, scale := 0, 1.0 - for high >= 1000 { + for high >= 1000 && unit+1 < len(dataUnits) { high, unit, scale = high/1000, unit+1, scale*1000 } // If the unit changes, re-create the chart (hack to set max height...) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 0e8529756..574f2ba15 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -526,6 +526,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from) // Create a timeout timer, and the associated hash fetcher + request := time.Now() // time of the last fetch request timeout := time.NewTimer(0) // timer to dump a non-responsive active peer <-timeout.C // timeout channel should be initially empty defer timeout.Stop() @@ -534,6 +535,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from) go p.getAbsHashes(from, MaxHashFetch) + request = time.Now() timeout.Reset(hashTTL) } // Start pulling hashes, until all are exhausted @@ -557,6 +559,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) break } + hashReqTimer.UpdateSince(request) timeout.Stop() // If no more hashes are inbound, notify the block fetcher and return @@ -609,6 +612,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { case <-timeout.C: glog.V(logger.Debug).Infof("%v: hash request timed out", p) + hashTimeoutMeter.Mark(1) return errTimeout } } @@ -896,6 +900,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { defer glog.V(logger.Debug).Infof("%v: header download terminated", p) // Create a timeout timer, and the associated hash fetcher + request := time.Now() // time of the last fetch request timeout := time.NewTimer(0) // timer to dump a non-responsive active peer <-timeout.C // timeout channel should be initially empty defer timeout.Stop() @@ -904,6 +909,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { glog.V(logger.Detail).Infof("%v: fetching %d headers from #%d", p, MaxHeaderFetch, from) go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) + request = time.Now() timeout.Reset(headerTTL) } // Start pulling headers, until all are exhausted @@ -927,6 +933,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", headerPack.peerId) break } + headerReqTimer.UpdateSince(request) timeout.Stop() // If no more headers are inbound, notify the body fetcher and return @@ -980,6 +987,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { case <-timeout.C: // Header retrieval timed out, consider the peer bad and drop glog.V(logger.Debug).Infof("%v: header request timed out", p) + headerTimeoutMeter.Mark(1) d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though @@ -1244,7 +1252,14 @@ func (d *Downloader) process() { // DeliverHashes61 injects a new batch of hashes received from a remote node into // the download schedule. This is usually invoked through the BlockHashesMsg by // the protocol handler. -func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) error { +func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) (err error) { + // Update the delivery metrics for both good and failed deliveries + hashInMeter.Mark(int64(len(hashes))) + defer func() { + if err != nil { + hashDropMeter.Mark(int64(len(hashes))) + } + }() // Make sure the downloader is active if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive @@ -1265,7 +1280,14 @@ func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) error { // DeliverBlocks61 injects a new batch of blocks received from a remote node. // This is usually invoked through the BlocksMsg by the protocol handler. -func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) error { +func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) (err error) { + // Update the delivery metrics for both good and failed deliveries + blockInMeter.Mark(int64(len(blocks))) + defer func() { + if err != nil { + blockDropMeter.Mark(int64(len(blocks))) + } + }() // Make sure the downloader is active if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive @@ -1286,7 +1308,14 @@ func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) error { // DeliverHeaders injects a new batch of blck headers received from a remote // node into the download schedule. -func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error { +func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { + // Update the delivery metrics for both good and failed deliveries + headerInMeter.Mark(int64(len(headers))) + defer func() { + if err != nil { + headerDropMeter.Mark(int64(len(headers))) + } + }() // Make sure the downloader is active if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive @@ -1306,7 +1335,14 @@ func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error { } // DeliverBodies injects a new batch of block bodies received from a remote node. -func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error { +func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) { + // Update the delivery metrics for both good and failed deliveries + bodyInMeter.Mark(int64(len(transactions))) + defer func() { + if err != nil { + bodyDropMeter.Mark(int64(len(transactions))) + } + }() // Make sure the downloader is active if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go new file mode 100644 index 000000000..fd926affd --- /dev/null +++ b/eth/downloader/metrics.go @@ -0,0 +1,45 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Contains the metrics collected by the downloader. + +package downloader + +import ( + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + hashInMeter = metrics.NewMeter("eth/downloader/hashes/in") + hashReqTimer = metrics.NewTimer("eth/downloader/hashes/req") + hashDropMeter = metrics.NewMeter("eth/downloader/hashes/drop") + hashTimeoutMeter = metrics.NewMeter("eth/downloader/hashes/timeout") + + blockInMeter = metrics.NewMeter("eth/downloader/blocks/in") + blockReqTimer = metrics.NewTimer("eth/downloader/blocks/req") + blockDropMeter = metrics.NewMeter("eth/downloader/blocks/drop") + blockTimeoutMeter = metrics.NewMeter("eth/downloader/blocks/timeout") + + headerInMeter = metrics.NewMeter("eth/downloader/headers/in") + headerReqTimer = metrics.NewTimer("eth/downloader/headers/req") + headerDropMeter = metrics.NewMeter("eth/downloader/headers/drop") + headerTimeoutMeter = metrics.NewMeter("eth/downloader/headers/timeout") + + bodyInMeter = metrics.NewMeter("eth/downloader/bodies/in") + bodyReqTimer = metrics.NewTimer("eth/downloader/bodies/req") + bodyDropMeter = metrics.NewMeter("eth/downloader/bodies/drop") + bodyTimeoutMeter = metrics.NewMeter("eth/downloader/bodies/timeout") +) diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index a527414ff..7db78327b 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -397,9 +397,19 @@ func (q *queue) Expire(timeout time.Duration) []string { peers := []string{} for id, request := range q.pendPool { if time.Since(request.Time) > timeout { + // Update the metrics with the timeout + if len(request.Hashes) > 0 { + blockTimeoutMeter.Mark(1) + } else { + bodyTimeoutMeter.Mark(1) + } + // Return any non satisfied requests to the pool for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } + for _, header := range request.Headers { + q.headerQueue.Push(header, -float32(header.Number.Uint64())) + } peers = append(peers, id) } } @@ -420,6 +430,7 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { if request == nil { return errNoFetchesPending } + blockReqTimer.UpdateSince(request.Time) delete(q.pendPool, id) // If no blocks were retrieved, mark them as unavailable for the origin peer @@ -468,6 +479,7 @@ func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists [] if request == nil { return errNoFetchesPending } + bodyReqTimer.UpdateSince(request.Time) delete(q.pendPool, id) // If no block bodies were retrieved, mark them as unavailable for the origin peer diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index f54256788..b8ec1fc55 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -347,18 +347,19 @@ func (f *Fetcher) loop() { case notification := <-f.notify: // A block was announced, make sure the peer isn't DOSing us - announceMeter.Mark(1) + propAnnounceInMeter.Mark(1) count := f.announces[notification.origin] + 1 if count > hashLimit { glog.V(logger.Debug).Infof("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit) + propAnnounceDOSMeter.Mark(1) break } // If we have a valid block number, check that it's potentially useful if notification.number > 0 { if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { glog.V(logger.Debug).Infof("[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d", notification.origin, notification.number, notification.hash[:4], dist) - discardMeter.Mark(1) + propAnnounceDropMeter.Mark(1) break } } @@ -377,7 +378,7 @@ func (f *Fetcher) loop() { case op := <-f.inject: // A direct block insertion was requested, try and fill any pending gaps - broadcastMeter.Mark(1) + propBroadcastInMeter.Mark(1) f.enqueue(op.origin, op.block) case hash := <-f.done: @@ -425,10 +426,12 @@ func (f *Fetcher) loop() { } if fetchBlocks != nil { // Use old eth/61 protocol to retrieve whole blocks + blockFetchMeter.Mark(int64(len(hashes))) fetchBlocks(hashes) } else { // Use new eth/62 protocol to retrieve headers first for _, hash := range hashes { + headerFetchMeter.Mark(1) fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals } } @@ -467,6 +470,7 @@ func (f *Fetcher) loop() { if f.completingHook != nil { f.completingHook(hashes) } + bodyFetchMeter.Mark(int64(len(hashes))) go f.completing[hashes[0]].fetchBodies(hashes) } // Schedule the next fetch if blocks are still pending @@ -480,6 +484,7 @@ func (f *Fetcher) loop() { case <-f.quit: return } + blockFilterInMeter.Mark(int64(len(blocks))) explicit, download := []*types.Block{}, []*types.Block{} for _, block := range blocks { @@ -498,6 +503,7 @@ func (f *Fetcher) loop() { } } + blockFilterOutMeter.Mark(int64(len(download))) select { case filter <- download: case <-f.quit: @@ -520,6 +526,8 @@ func (f *Fetcher) loop() { case <-f.quit: return } + headerFilterInMeter.Mark(int64(len(task.headers))) + // Split the batch of headers into unknown ones (to return to the caller), // known incomplete ones (requiring body retrievals) and completed blocks. unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{} @@ -544,7 +552,10 @@ func (f *Fetcher) loop() { if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4]) - complete = append(complete, types.NewBlockWithHeader(header)) + block := types.NewBlockWithHeader(header) + block.ReceivedAt = task.time + + complete = append(complete, block) f.completing[hash] = announce continue } @@ -559,6 +570,7 @@ func (f *Fetcher) loop() { unknown = append(unknown, header) } } + headerFilterOutMeter.Mark(int64(len(unknown))) select { case filter <- &headerFilterTask{headers: unknown, time: task.time}: case <-f.quit: @@ -590,6 +602,7 @@ func (f *Fetcher) loop() { case <-f.quit: return } + bodyFilterInMeter.Mark(int64(len(task.transactions))) blocks := []*types.Block{} for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ { @@ -606,7 +619,10 @@ func (f *Fetcher) loop() { matched = true if f.getBlock(hash) == nil { - blocks = append(blocks, types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])) + block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]) + block.ReceivedAt = task.time + + blocks = append(blocks, block) } else { f.forgetHash(hash) } @@ -621,6 +637,7 @@ func (f *Fetcher) loop() { } } + bodyFilterOutMeter.Mark(int64(len(task.transactions))) select { case filter <- task: case <-f.quit: @@ -677,13 +694,14 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) { count := f.queues[peer] + 1 if count > blockLimit { glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit) + propBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } // Discard any past or too distant blocks if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist) - discardMeter.Mark(1) + propBroadcastDropMeter.Mark(1) f.forgetHash(hash) return } @@ -724,11 +742,10 @@ func (f *Fetcher) insert(peer string, block *types.Block) { switch err := f.validateBlock(block, parent); err { case nil: // All ok, quickly propagate to our peers - broadcastTimer.UpdateSince(block.ReceivedAt) + propBroadcastOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, true) case core.BlockFutureErr: - futureMeter.Mark(1) // Weird future block, don't fail, but neither propagate default: @@ -743,7 +760,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) { return } // If import succeeded, broadcast the block - announceTimer.UpdateSince(block.ReceivedAt) + propAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, false) // Invoke the testing hook if needed diff --git a/eth/fetcher/metrics.go b/eth/fetcher/metrics.go index 76cc49226..b82d3ca01 100644 --- a/eth/fetcher/metrics.go +++ b/eth/fetcher/metrics.go @@ -23,10 +23,24 @@ import ( ) var ( - announceMeter = metrics.NewMeter("eth/sync/RemoteAnnounces") - announceTimer = metrics.NewTimer("eth/sync/LocalAnnounces") - broadcastMeter = metrics.NewMeter("eth/sync/RemoteBroadcasts") - broadcastTimer = metrics.NewTimer("eth/sync/LocalBroadcasts") - discardMeter = metrics.NewMeter("eth/sync/DiscardedBlocks") - futureMeter = metrics.NewMeter("eth/sync/FutureBlocks") + propAnnounceInMeter = metrics.NewMeter("eth/fetcher/prop/announces/in") + propAnnounceOutTimer = metrics.NewTimer("eth/fetcher/prop/announces/out") + propAnnounceDropMeter = metrics.NewMeter("eth/fetcher/prop/announces/drop") + propAnnounceDOSMeter = metrics.NewMeter("eth/fetcher/prop/announces/dos") + + propBroadcastInMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/in") + propBroadcastOutTimer = metrics.NewTimer("eth/fetcher/prop/broadcasts/out") + propBroadcastDropMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/drop") + propBroadcastDOSMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/dos") + + blockFetchMeter = metrics.NewMeter("eth/fetcher/fetch/blocks") + headerFetchMeter = metrics.NewMeter("eth/fetcher/fetch/headers") + bodyFetchMeter = metrics.NewMeter("eth/fetcher/fetch/bodies") + + blockFilterInMeter = metrics.NewMeter("eth/fetcher/filter/blocks/in") + blockFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/blocks/out") + headerFilterInMeter = metrics.NewMeter("eth/fetcher/filter/headers/in") + headerFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/headers/out") + bodyFilterInMeter = metrics.NewMeter("eth/fetcher/filter/bodies/in") + bodyFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/bodies/out") ) diff --git a/eth/metrics.go b/eth/metrics.go index 21002094c..cfab3bcb3 100644 --- a/eth/metrics.go +++ b/eth/metrics.go @@ -42,22 +42,22 @@ var ( reqBlockInTrafficMeter = metrics.NewMeter("eth/req/blocks/in/traffic") reqBlockOutPacketsMeter = metrics.NewMeter("eth/req/blocks/out/packets") reqBlockOutTrafficMeter = metrics.NewMeter("eth/req/blocks/out/traffic") - reqHeaderInPacketsMeter = metrics.NewMeter("eth/req/header/in/packets") - reqHeaderInTrafficMeter = metrics.NewMeter("eth/req/header/in/traffic") - reqHeaderOutPacketsMeter = metrics.NewMeter("eth/req/header/out/packets") - reqHeaderOutTrafficMeter = metrics.NewMeter("eth/req/header/out/traffic") - reqBodyInPacketsMeter = metrics.NewMeter("eth/req/body/in/packets") - reqBodyInTrafficMeter = metrics.NewMeter("eth/req/body/in/traffic") - reqBodyOutPacketsMeter = metrics.NewMeter("eth/req/body/out/packets") - reqBodyOutTrafficMeter = metrics.NewMeter("eth/req/body/out/traffic") - reqStateInPacketsMeter = metrics.NewMeter("eth/req/state/in/packets") - reqStateInTrafficMeter = metrics.NewMeter("eth/req/state/in/traffic") - reqStateOutPacketsMeter = metrics.NewMeter("eth/req/state/out/packets") - reqStateOutTrafficMeter = metrics.NewMeter("eth/req/state/out/traffic") - reqReceiptInPacketsMeter = metrics.NewMeter("eth/req/receipt/in/packets") - reqReceiptInTrafficMeter = metrics.NewMeter("eth/req/receipt/in/traffic") - reqReceiptOutPacketsMeter = metrics.NewMeter("eth/req/receipt/out/packets") - reqReceiptOutTrafficMeter = metrics.NewMeter("eth/req/receipt/out/traffic") + reqHeaderInPacketsMeter = metrics.NewMeter("eth/req/headers/in/packets") + reqHeaderInTrafficMeter = metrics.NewMeter("eth/req/headers/in/traffic") + reqHeaderOutPacketsMeter = metrics.NewMeter("eth/req/headers/out/packets") + reqHeaderOutTrafficMeter = metrics.NewMeter("eth/req/headers/out/traffic") + reqBodyInPacketsMeter = metrics.NewMeter("eth/req/bodies/in/packets") + reqBodyInTrafficMeter = metrics.NewMeter("eth/req/bodies/in/traffic") + reqBodyOutPacketsMeter = metrics.NewMeter("eth/req/bodies/out/packets") + reqBodyOutTrafficMeter = metrics.NewMeter("eth/req/bodies/out/traffic") + reqStateInPacketsMeter = metrics.NewMeter("eth/req/states/in/packets") + reqStateInTrafficMeter = metrics.NewMeter("eth/req/states/in/traffic") + reqStateOutPacketsMeter = metrics.NewMeter("eth/req/states/out/packets") + reqStateOutTrafficMeter = metrics.NewMeter("eth/req/states/out/traffic") + reqReceiptInPacketsMeter = metrics.NewMeter("eth/req/receipts/in/packets") + reqReceiptInTrafficMeter = metrics.NewMeter("eth/req/receipts/in/traffic") + reqReceiptOutPacketsMeter = metrics.NewMeter("eth/req/receipts/out/packets") + reqReceiptOutTrafficMeter = metrics.NewMeter("eth/req/receipts/out/traffic") miscInPacketsMeter = metrics.NewMeter("eth/misc/in/packets") miscInTrafficMeter = metrics.NewMeter("eth/misc/in/traffic") miscOutPacketsMeter = metrics.NewMeter("eth/misc/out/packets")