diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 52dcf9401..6bbaa2f55 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -223,7 +223,7 @@ func handleMessage(backend Backend, peer *Peer) error { if peer.Version() >= ETH66 { handlers = eth66 } - // Track the emount of time it takes to serve the request and run the handler + // Track the amount of time it takes to serve the request and run the handler if metrics.Enabled { h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code) defer func(start time.Time) { diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 8433fa343..d0dec7b0b 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -327,6 +327,8 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { if err := msg.Decode(res); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + requestTracker.Fulfil(peer.id, peer.version, BlockHeadersMsg, res.RequestId) + return backend.Handle(peer, &res.BlockHeadersPacket) } @@ -345,6 +347,8 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { if err := msg.Decode(res); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + requestTracker.Fulfil(peer.id, peer.version, BlockBodiesMsg, res.RequestId) + return backend.Handle(peer, &res.BlockBodiesPacket) } @@ -363,6 +367,8 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error { if err := msg.Decode(res); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + requestTracker.Fulfil(peer.id, peer.version, NodeDataMsg, res.RequestId) + return backend.Handle(peer, &res.NodeDataPacket) } @@ -381,6 +387,8 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error { if err := msg.Decode(res); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + requestTracker.Fulfil(peer.id, peer.version, ReceiptsMsg, res.RequestId) + return backend.Handle(peer, &res.ReceiptsPacket) } @@ -506,5 +514,7 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error } peer.markTransaction(tx.Hash()) } + requestTracker.Fulfil(peer.id, peer.version, PooledTransactionsMsg, txs.RequestId) + return backend.Handle(peer, &txs.PooledTransactionsPacket) } diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 709fca865..e619c183b 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -413,8 +413,11 @@ func (p *Peer) RequestOneHeader(hash common.Hash) error { Reverse: false, } if p.Version() >= ETH66 { + id := rand.Uint64() + + requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ - RequestId: rand.Uint64(), + RequestId: id, GetBlockHeadersPacket: &query, }) } @@ -432,8 +435,11 @@ func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, re Reverse: reverse, } if p.Version() >= ETH66 { + id := rand.Uint64() + + requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ - RequestId: rand.Uint64(), + RequestId: id, GetBlockHeadersPacket: &query, }) } @@ -451,8 +457,11 @@ func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, rever Reverse: reverse, } if p.Version() >= ETH66 { + id := rand.Uint64() + + requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ - RequestId: rand.Uint64(), + RequestId: id, GetBlockHeadersPacket: &query, }) } @@ -476,8 +485,11 @@ func (p *Peer) ExpectRequestHeadersByNumber(origin uint64, amount int, skip int, func (p *Peer) RequestBodies(hashes []common.Hash) error { p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) if p.Version() >= ETH66 { + id := rand.Uint64() + + requestTracker.Track(p.id, p.version, GetBlockBodiesMsg, BlockBodiesMsg, id) return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{ - RequestId: rand.Uint64(), + RequestId: id, GetBlockBodiesPacket: hashes, }) } @@ -489,8 +501,11 @@ func (p *Peer) RequestBodies(hashes []common.Hash) error { func (p *Peer) RequestNodeData(hashes []common.Hash) error { p.Log().Debug("Fetching batch of state data", "count", len(hashes)) if p.Version() >= ETH66 { + id := rand.Uint64() + + requestTracker.Track(p.id, p.version, GetNodeDataMsg, NodeDataMsg, id) return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{ - RequestId: rand.Uint64(), + RequestId: id, GetNodeDataPacket: hashes, }) } @@ -501,8 +516,11 @@ func (p *Peer) RequestNodeData(hashes []common.Hash) error { func (p *Peer) RequestReceipts(hashes []common.Hash) error { p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) if p.Version() >= ETH66 { + id := rand.Uint64() + + requestTracker.Track(p.id, p.version, GetReceiptsMsg, ReceiptsMsg, id) return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{ - RequestId: rand.Uint64(), + RequestId: id, GetReceiptsPacket: hashes, }) } @@ -513,8 +531,11 @@ func (p *Peer) RequestReceipts(hashes []common.Hash) error { func (p *Peer) RequestTxs(hashes []common.Hash) error { p.Log().Debug("Fetching batch of transactions", "count", len(hashes)) if p.Version() >= ETH66 { + id := rand.Uint64() + + requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id) return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{ - RequestId: rand.Uint64(), + RequestId: id, GetPooledTransactionsPacket: hashes, }) } diff --git a/eth/protocols/eth/tracker.go b/eth/protocols/eth/tracker.go new file mode 100644 index 000000000..324fd2283 --- /dev/null +++ b/eth/protocols/eth/tracker.go @@ -0,0 +1,26 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package eth + +import ( + "time" + + "github.com/ethereum/go-ethereum/p2p/tracker" +) + +// requestTracker is a singleton tracker for eth/66 and newer request times. +var requestTracker = tracker.New(ProtocolName, 5*time.Minute) diff --git a/eth/protocols/snap/handler.go b/eth/protocols/snap/handler.go index f7939964f..4c12adfa8 100644 --- a/eth/protocols/snap/handler.go +++ b/eth/protocols/snap/handler.go @@ -227,6 +227,8 @@ func handleMessage(backend Backend, peer *Peer) error { return fmt.Errorf("accounts not monotonically increasing: #%d [%x] vs #%d [%x]", i-1, res.Accounts[i-1].Hash[:], i, res.Accounts[i].Hash[:]) } } + requestTracker.Fulfil(peer.id, peer.version, AccountRangeMsg, res.ID) + return backend.Handle(peer, res) case msg.Code == GetStorageRangesMsg: @@ -360,6 +362,8 @@ func handleMessage(backend Backend, peer *Peer) error { } } } + requestTracker.Fulfil(peer.id, peer.version, StorageRangesMsg, res.ID) + return backend.Handle(peer, res) case msg.Code == GetByteCodesMsg: @@ -404,6 +408,8 @@ func handleMessage(backend Backend, peer *Peer) error { if err := msg.Decode(res); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + requestTracker.Fulfil(peer.id, peer.version, ByteCodesMsg, res.ID) + return backend.Handle(peer, res) case msg.Code == GetTrieNodesMsg: @@ -497,6 +503,8 @@ func handleMessage(backend Backend, peer *Peer) error { if err := msg.Decode(res); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + requestTracker.Fulfil(peer.id, peer.version, TrieNodesMsg, res.ID) + return backend.Handle(peer, res) default: diff --git a/eth/protocols/snap/peer.go b/eth/protocols/snap/peer.go index 4f3d550f1..cf0ce65bd 100644 --- a/eth/protocols/snap/peer.go +++ b/eth/protocols/snap/peer.go @@ -65,6 +65,8 @@ func (p *Peer) Log() log.Logger { // trie, starting with the origin. func (p *Peer) RequestAccountRange(id uint64, root common.Hash, origin, limit common.Hash, bytes uint64) error { p.logger.Trace("Fetching range of accounts", "reqid", id, "root", root, "origin", origin, "limit", limit, "bytes", common.StorageSize(bytes)) + + requestTracker.Track(p.id, p.version, GetAccountRangeMsg, AccountRangeMsg, id) return p2p.Send(p.rw, GetAccountRangeMsg, &GetAccountRangePacket{ ID: id, Root: root, @@ -83,6 +85,7 @@ func (p *Peer) RequestStorageRanges(id uint64, root common.Hash, accounts []comm } else { p.logger.Trace("Fetching ranges of small storage slots", "reqid", id, "root", root, "accounts", len(accounts), "first", accounts[0], "bytes", common.StorageSize(bytes)) } + requestTracker.Track(p.id, p.version, GetStorageRangesMsg, StorageRangesMsg, id) return p2p.Send(p.rw, GetStorageRangesMsg, &GetStorageRangesPacket{ ID: id, Root: root, @@ -96,6 +99,8 @@ func (p *Peer) RequestStorageRanges(id uint64, root common.Hash, accounts []comm // RequestByteCodes fetches a batch of bytecodes by hash. func (p *Peer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error { p.logger.Trace("Fetching set of byte codes", "reqid", id, "hashes", len(hashes), "bytes", common.StorageSize(bytes)) + + requestTracker.Track(p.id, p.version, GetByteCodesMsg, ByteCodesMsg, id) return p2p.Send(p.rw, GetByteCodesMsg, &GetByteCodesPacket{ ID: id, Hashes: hashes, @@ -107,6 +112,8 @@ func (p *Peer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) e // a specificstate trie. func (p *Peer) RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error { p.logger.Trace("Fetching set of trie nodes", "reqid", id, "root", root, "pathsets", len(paths), "bytes", common.StorageSize(bytes)) + + requestTracker.Track(p.id, p.version, GetTrieNodesMsg, TrieNodesMsg, id) return p2p.Send(p.rw, GetTrieNodesMsg, &GetTrieNodesPacket{ ID: id, Root: root, diff --git a/eth/protocols/snap/tracker.go b/eth/protocols/snap/tracker.go new file mode 100644 index 000000000..2cf59cc23 --- /dev/null +++ b/eth/protocols/snap/tracker.go @@ -0,0 +1,26 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snap + +import ( + "time" + + "github.com/ethereum/go-ethereum/p2p/tracker" +) + +// requestTracker is a singleton tracker for request times. +var requestTracker = tracker.New(ProtocolName, time.Minute) diff --git a/p2p/metrics.go b/p2p/metrics.go index be0d2f495..1bb505cdf 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -33,9 +33,6 @@ const ( // HandleHistName is the prefix of the per-packet serving time histograms. HandleHistName = "p2p/handle" - - // WaitHistName is the prefix of the per-packet (req only) waiting time histograms. - WaitHistName = "p2p/wait" ) var ( diff --git a/p2p/tracker/tracker.go b/p2p/tracker/tracker.go new file mode 100644 index 000000000..b50a952f6 --- /dev/null +++ b/p2p/tracker/tracker.go @@ -0,0 +1,203 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package tracker + +import ( + "container/list" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +const ( + // trackedGaugeName is the prefix of the per-packet request tracking. + trackedGaugeName = "p2p/tracked" + + // lostMeterName is the prefix of the per-packet request expirations. + lostMeterName = "p2p/lost" + + // staleMeterName is the prefix of the per-packet stale responses. + staleMeterName = "p2p/stale" + + // waitHistName is the prefix of the per-packet (req only) waiting time histograms. + waitHistName = "p2p/wait" + + // maxTrackedPackets is a huge number to act as a failsafe on the number of + // pending requests the node will track. It should never be hit unless an + // attacker figures out a way to spin requests. + maxTrackedPackets = 100000 +) + +// request tracks sent network requests which have not yet received a response. +type request struct { + peer string + version uint // Protocol version + + reqCode uint64 // Protocol message code of the request + resCode uint64 // Protocol message code of the expected response + + time time.Time // Timestamp when the request was made + expire *list.Element // Expiration marker to untrack it +} + +// Tracker is a pending network request tracker to measure how much time it takes +// a remote peer to respond. +type Tracker struct { + protocol string // Protocol capability identifier for the metrics + timeout time.Duration // Global timeout after which to drop a tracked packet + + pending map[uint64]*request // Currently pending requests + expire *list.List // Linked list tracking the expiration order + wake *time.Timer // Timer tracking the expiration of the next item + + lock sync.Mutex // Lock protecting from concurrent updates +} + +// New creates a new network request tracker to monitor how much time it takes to +// fill certain requests and how individual peers perform. +func New(protocol string, timeout time.Duration) *Tracker { + return &Tracker{ + protocol: protocol, + timeout: timeout, + pending: make(map[uint64]*request), + expire: list.New(), + } +} + +// Track adds a network request to the tracker to wait for a response to arrive +// or until the request it cancelled or times out. +func (t *Tracker) Track(peer string, version uint, reqCode uint64, resCode uint64, id uint64) { + if !metrics.Enabled { + return + } + t.lock.Lock() + defer t.lock.Unlock() + + // If there's a duplicate request, we've just random-collided (or more probably, + // we have a bug), report it. We could also add a metric, but we're not really + // expecting ourselves to be buggy, so a noisy warning should be enough. + if _, ok := t.pending[id]; ok { + log.Error("Network request id collision", "protocol", t.protocol, "version", version, "code", reqCode, "id", id) + return + } + // If we have too many pending requests, bail out instead of leaking memory + if pending := len(t.pending); pending >= maxTrackedPackets { + log.Error("Request tracker exceeded allowance", "pending", pending, "peer", peer, "protocol", t.protocol, "version", version, "code", reqCode) + return + } + // Id doesn't exist yet, start tracking it + t.pending[id] = &request{ + peer: peer, + version: version, + reqCode: reqCode, + resCode: resCode, + time: time.Now(), + expire: t.expire.PushBack(id), + } + g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, version, reqCode) + metrics.GetOrRegisterGauge(g, nil).Inc(1) + + // If we've just inserted the first item, start the expiration timer + if t.wake == nil { + t.wake = time.AfterFunc(t.timeout, t.clean) + } +} + +// clean is called automatically when a preset time passes without a response +// being dleivered for the first network request. +func (t *Tracker) clean() { + t.lock.Lock() + defer t.lock.Unlock() + + // Expire anything within a certain threshold (might be no items at all if + // we raced with the delivery) + for t.expire.Len() > 0 { + // Stop iterating if the next pending request is still alive + var ( + head = t.expire.Front() + id = head.Value.(uint64) + req = t.pending[id] + ) + if time.Since(req.time) < t.timeout+5*time.Millisecond { + break + } + // Nope, dead, drop it + t.expire.Remove(head) + delete(t.pending, id) + + g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, req.version, req.reqCode) + metrics.GetOrRegisterGauge(g, nil).Dec(1) + + m := fmt.Sprintf("%s/%s/%d/%#02x", lostMeterName, t.protocol, req.version, req.reqCode) + metrics.GetOrRegisterMeter(m, nil).Mark(1) + } + t.schedule() +} + +// schedule starts a timer to trigger on the expiration of the first network +// packet. +func (t *Tracker) schedule() { + if t.expire.Len() == 0 { + t.wake = nil + return + } + t.wake = time.AfterFunc(time.Until(t.pending[t.expire.Front().Value.(uint64)].time.Add(t.timeout)), t.clean) +} + +// Fulfil fills a pending request, if any is available, reporting on various metrics. +func (t *Tracker) Fulfil(peer string, version uint, code uint64, id uint64) { + if !metrics.Enabled { + return + } + t.lock.Lock() + defer t.lock.Unlock() + + // If it's a non existing request, track as stale response + req, ok := t.pending[id] + if !ok { + m := fmt.Sprintf("%s/%s/%d/%#02x", staleMeterName, t.protocol, version, code) + metrics.GetOrRegisterMeter(m, nil).Mark(1) + return + } + // If the response is funky, it might be some active attack + if req.peer != peer || req.version != version || req.resCode != code { + log.Warn("Network response id collision", + "have", fmt.Sprintf("%s:%s/%d:%d", peer, t.protocol, version, code), + "want", fmt.Sprintf("%s:%s/%d:%d", peer, t.protocol, req.version, req.resCode), + ) + return + } + // Everything matches, mark the request serviced and meter it + t.expire.Remove(req.expire) + if req.expire.Prev() == nil { + t.wake.Stop() + t.schedule() + } + g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, req.version, req.reqCode) + metrics.GetOrRegisterGauge(g, nil).Dec(1) + + h := fmt.Sprintf("%s/%s/%d/%#02x", waitHistName, t.protocol, req.version, req.reqCode) + sampler := func() metrics.Sample { + return metrics.ResettingSample( + metrics.NewExpDecaySample(1028, 0.015), + ) + } + metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(req.time).Microseconds()) +}