From a251bca67ce2355f5262d116f882617317033dd6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 3 Jan 2023 12:36:38 +0100 Subject: [PATCH] p2p/discover: add more packet information in logs (#26307) * p2p/discover: add more packet information in logs This adds more fields to discv5 packet logs. These can be useful when debugging multi-packet interactions. The FINDNODE message also gets an additional field, OpID for debugging purposes. This field is not encoded onto the wire. I'm also removing topic system related message types in this change. These will come back in the future, where support for them will be guarded by a config flag. * p2p/discover/v5wire: rename 'Total' to 'RespCount' The new name captures the meaning of this field better. --- cmd/devp2p/internal/v5test/discv5tests.go | 2 +- cmd/devp2p/internal/v5test/framework.go | 12 +- p2p/discover/v5_udp.go | 27 +++-- p2p/discover/v5_udp_test.go | 28 ++--- p2p/discover/v5wire/encoding_test.go | 8 +- p2p/discover/v5wire/msg.go | 140 ++++++++++------------ 6 files changed, 105 insertions(+), 112 deletions(-) diff --git a/cmd/devp2p/internal/v5test/discv5tests.go b/cmd/devp2p/internal/v5test/discv5tests.go index a7cd35276..56624a0ca 100644 --- a/cmd/devp2p/internal/v5test/discv5tests.go +++ b/cmd/devp2p/internal/v5test/discv5tests.go @@ -355,7 +355,7 @@ func (bn *bystander) loop() { wasAdded = true bn.notifyAdded() case *v5wire.Findnode: - bn.conn.write(bn.l, &v5wire.Nodes{ReqID: p.ReqID, Total: 1}, nil) + bn.conn.write(bn.l, &v5wire.Nodes{ReqID: p.ReqID, RespCount: 1}, nil) wasAdded = true bn.notifyAdded() case *v5wire.TalkRequest: diff --git a/cmd/devp2p/internal/v5test/framework.go b/cmd/devp2p/internal/v5test/framework.go index f31677e51..10856a50b 100644 --- a/cmd/devp2p/internal/v5test/framework.go +++ b/cmd/devp2p/internal/v5test/framework.go @@ -44,6 +44,8 @@ func (p *readError) Unwrap() error { return p.err } func (p *readError) RequestID() []byte { return nil } func (p *readError) SetRequestID([]byte) {} +func (p *readError) AppendLogInfo(ctx []interface{}) []interface{} { return ctx } + // readErrorf creates a readError with the given text. func readErrorf(format string, args ...interface{}) *readError { return &readError{fmt.Errorf(format, args...)} @@ -171,16 +173,16 @@ func (tc *conn) findnode(c net.PacketConn, dists []uint) ([]*enode.Node, error) // Check total count. It should be greater than one // and needs to be the same across all responses. if first { - if resp.Total == 0 || resp.Total > 6 { - return nil, fmt.Errorf("invalid NODES response 'total' %d (not in (0,7))", resp.Total) + if resp.RespCount == 0 || resp.RespCount > 6 { + return nil, fmt.Errorf("invalid NODES response count %d (not in (0,7))", resp.RespCount) } - total = resp.Total + total = resp.RespCount n = int(total) - 1 first = false } else { n-- - if resp.Total != total { - return nil, fmt.Errorf("invalid NODES response 'total' %d (!= %d)", resp.Total, total) + if resp.RespCount != total { + return nil, fmt.Errorf("invalid NODES response count %d (!= %d)", resp.RespCount, total) } } // Check nodes. diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 57d624498..53a1c6f76 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -70,6 +70,9 @@ type UDPv5 struct { clock mclock.Clock validSchemes enr.IdentityScheme + // misc buffers used during message handling + logcontext []interface{} + // talkreq handler registry trlock sync.Mutex trhandlers map[string]TalkRequestHandler @@ -158,6 +161,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { activeCallByNode: make(map[enode.ID]*callV5), activeCallByAuth: make(map[v5wire.Nonce]*callV5), callQueue: make(map[enode.ID][]*callV5), + // shutdown closeCtx: closeCtx, cancelCloseCtx: cancelCloseCtx, @@ -385,7 +389,7 @@ func (t *UDPv5) waitForNodes(c *callV5, distances []uint) ([]*enode.Node, error) nodes = append(nodes, node) } if total == -1 { - total = min(int(response.Total), totalNodesResponseLimit) + total = min(int(response.RespCount), totalNodesResponseLimit) } if received++; received == total { return nodes, nil @@ -601,13 +605,18 @@ func (t *UDPv5) sendResponse(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.P // send sends a packet to the given node. func (t *UDPv5) send(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.Packet, c *v5wire.Whoareyou) (v5wire.Nonce, error) { addr := toAddr.String() + t.logcontext = append(t.logcontext[:0], "id", toID, "addr", addr) + t.logcontext = packet.AppendLogInfo(t.logcontext) + enc, nonce, err := t.codec.Encode(toID, addr, packet, c) if err != nil { - t.log.Warn(">> "+packet.Name(), "id", toID, "addr", addr, "err", err) + t.logcontext = append(t.logcontext, "err", err) + t.log.Warn(">> "+packet.Name(), t.logcontext...) return nonce, err } + _, err = t.conn.WriteToUDP(enc, toAddr) - t.log.Trace(">> "+packet.Name(), "id", toID, "addr", addr) + t.log.Trace(">> "+packet.Name(), t.logcontext...) return nonce, err } @@ -657,7 +666,9 @@ func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr *net.UDPAddr) error { } if packet.Kind() != v5wire.WhoareyouPacket { // WHOAREYOU logged separately to report errors. - t.log.Trace("<< "+packet.Name(), "id", fromID, "addr", addr) + t.logcontext = append(t.logcontext[:0], "id", fromID, "addr", addr) + t.logcontext = packet.AppendLogInfo(t.logcontext) + t.log.Trace("<< "+packet.Name(), t.logcontext...) } t.handle(packet, fromID, fromAddr) return nil @@ -712,7 +723,7 @@ func (t *UDPv5) handle(p v5wire.Packet, fromID enode.ID, fromAddr *net.UDPAddr) case *v5wire.Nodes: t.handleCallResponse(fromID, fromAddr, p) case *v5wire.TalkRequest: - t.handleTalkRequest(p, fromID, fromAddr) + t.handleTalkRequest(fromID, fromAddr, p) case *v5wire.TalkResponse: t.handleCallResponse(fromID, fromAddr, p) } @@ -827,7 +838,7 @@ func (t *UDPv5) collectTableNodes(rip net.IP, distances []uint, limit int) []*en // packNodes creates NODES response packets for the given node list. func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes { if len(nodes) == 0 { - return []*v5wire.Nodes{{ReqID: reqid, Total: 1}} + return []*v5wire.Nodes{{ReqID: reqid, RespCount: 1}} } // This limit represents the available space for nodes in output packets. Maximum @@ -851,13 +862,13 @@ func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes { resp = append(resp, p) } for _, msg := range resp { - msg.Total = uint8(len(resp)) + msg.RespCount = uint8(len(resp)) } return resp } // handleTalkRequest runs the talk request handler of the requested protocol. -func (t *UDPv5) handleTalkRequest(p *v5wire.TalkRequest, fromID enode.ID, fromAddr *net.UDPAddr) { +func (t *UDPv5) handleTalkRequest(fromID enode.ID, fromAddr *net.UDPAddr, p *v5wire.TalkRequest) { t.trlock.Lock() handler := t.trhandlers[p.Protocol] t.trlock.Unlock() diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index ab0cb9a82..481bb1cdc 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -208,8 +208,8 @@ func (test *udpV5Test) expectNodes(wantReqID []byte, wantTotal uint8, wantNodes if !bytes.Equal(p.ReqID, wantReqID) { test.t.Fatalf("wrong request ID %v in response, want %v", p.ReqID, wantReqID) } - if p.Total != wantTotal { - test.t.Fatalf("wrong total response count %d, want %d", p.Total, wantTotal) + if p.RespCount != wantTotal { + test.t.Fatalf("wrong total response count %d, want %d", p.RespCount, wantTotal) } for _, record := range p.Nodes { n, _ := enode.New(enode.ValidSchemesForTesting, record) @@ -301,14 +301,14 @@ func TestUDPv5_findnodeCall(t *testing.T) { t.Fatalf("wrong distances in request: %v", p.Distances) } test.packetIn(&v5wire.Nodes{ - ReqID: p.ReqID, - Total: 2, - Nodes: nodesToRecords(nodes[:4]), + ReqID: p.ReqID, + RespCount: 2, + Nodes: nodesToRecords(nodes[:4]), }) test.packetIn(&v5wire.Nodes{ - ReqID: p.ReqID, - Total: 2, - Nodes: nodesToRecords(nodes[4:]), + ReqID: p.ReqID, + RespCount: 2, + Nodes: nodesToRecords(nodes[4:]), }) }) @@ -409,16 +409,16 @@ func TestUDPv5_callTimeoutReset(t *testing.T) { test.waitPacketOut(func(p *v5wire.Findnode, addr *net.UDPAddr, _ v5wire.Nonce) { time.Sleep(respTimeout - 50*time.Millisecond) test.packetIn(&v5wire.Nodes{ - ReqID: p.ReqID, - Total: 2, - Nodes: nodesToRecords(nodes[:4]), + ReqID: p.ReqID, + RespCount: 2, + Nodes: nodesToRecords(nodes[:4]), }) time.Sleep(respTimeout - 50*time.Millisecond) test.packetIn(&v5wire.Nodes{ - ReqID: p.ReqID, - Total: 2, - Nodes: nodesToRecords(nodes[4:]), + ReqID: p.ReqID, + RespCount: 2, + Nodes: nodesToRecords(nodes[4:]), }) }) if err := <-done; err != nil { diff --git a/p2p/discover/v5wire/encoding_test.go b/p2p/discover/v5wire/encoding_test.go index 25df73283..a5387311a 100644 --- a/p2p/discover/v5wire/encoding_test.go +++ b/p2p/discover/v5wire/encoding_test.go @@ -92,7 +92,7 @@ func TestHandshake(t *testing.T) { } // A <- B NODES - nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1}) + nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1}) net.nodeA.expectDecode(t, NodesMsg, nodes) } @@ -150,7 +150,7 @@ func TestHandshake_norecord(t *testing.T) { net.nodeB.expectDecode(t, FindnodeMsg, findnode) // A <- B NODES - nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1}) + nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1}) net.nodeA.expectDecode(t, NodesMsg, nodes) } @@ -190,7 +190,7 @@ func TestHandshake_rekey(t *testing.T) { net.nodeB.expectDecode(t, FindnodeMsg, findnode) // A <- B NODES - nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1}) + nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1}) net.nodeA.expectDecode(t, NodesMsg, nodes) } @@ -225,7 +225,7 @@ func TestHandshake_rekey2(t *testing.T) { net.nodeB.expectDecode(t, FindnodeMsg, findnode) // A <- B NODES - nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1}) + nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1}) net.nodeA.expectDecode(t, NodesMsg, nodes) } diff --git a/p2p/discover/v5wire/msg.go b/p2p/discover/v5wire/msg.go index 1316598a4..fb8e1e12c 100644 --- a/p2p/discover/v5wire/msg.go +++ b/p2p/discover/v5wire/msg.go @@ -20,6 +20,7 @@ import ( "fmt" "net" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" @@ -32,6 +33,10 @@ type Packet interface { Kind() byte // Kind returns the message type. RequestID() []byte // Returns the request ID. SetRequestID([]byte) // Sets the request ID. + + // AppendLogInfo returns its argument 'ctx' with additional fields + // appended for logging purposes. + AppendLogInfo(ctx []interface{}) []interface{} } // Message types. @@ -44,9 +49,6 @@ const ( TalkResponseMsg RequestTicketMsg TicketMsg - RegtopicMsg - RegconfirmationMsg - TopicQueryMsg UnknownPacket = byte(255) // any non-decryptable packet WhoareyouPacket = byte(254) // the WHOAREYOU packet @@ -59,7 +61,7 @@ type ( Nonce Nonce } - // Whoareyou contains the handshake challenge. + // WHOAREYOU contains the handshake challenge. Whoareyou struct { ChallengeData []byte // Encoded challenge Nonce Nonce // Nonce of request packet @@ -73,13 +75,13 @@ type ( sent mclock.AbsTime // for handshake GC. } - // Ping is sent during liveness checks. + // PING is sent during liveness checks. Ping struct { ReqID []byte ENRSeq uint64 } - // Pong is the reply to Ping. + // PONG is the reply to PING. Pong struct { ReqID []byte ENRSeq uint64 @@ -87,62 +89,35 @@ type ( ToPort uint16 // packet, which provides a way to discover the external address (after NAT). } - // Findnode is a query for nodes in the given bucket. + // FINDNODE is a query for nodes in the given bucket. Findnode struct { ReqID []byte Distances []uint + + // OpID is for debugging purposes and is not part of the packet encoding. + // It identifies the 'operation' on behalf of which the request was sent. + OpID uint64 `rlp:"-"` } - // Nodes is the reply to Findnode and Topicquery. + // NODES is a response to FINDNODE. Nodes struct { - ReqID []byte - Total uint8 - Nodes []*enr.Record + ReqID []byte + RespCount uint8 // total number of responses to the request + Nodes []*enr.Record } - // TalkRequest is an application-level request. + // TALKREQ is an application-level request. TalkRequest struct { ReqID []byte Protocol string Message []byte } - // TalkResponse is the reply to TalkRequest. + // TALKRESP is the reply to TALKREQ. TalkResponse struct { ReqID []byte Message []byte } - - // RequestTicket requests a ticket for a topic queue. - RequestTicket struct { - ReqID []byte - Topic []byte - } - - // Ticket is the response to RequestTicket. - Ticket struct { - ReqID []byte - Ticket []byte - } - - // Regtopic registers the sender in a topic queue using a ticket. - Regtopic struct { - ReqID []byte - Ticket []byte - ENR *enr.Record - } - - // Regconfirmation is the reply to Regtopic. - Regconfirmation struct { - ReqID []byte - Registered bool - } - - // TopicQuery asks for nodes with the given topic. - TopicQuery struct { - ReqID []byte - Topic []byte - } ) // DecodeMessage decodes the message body of a packet. @@ -161,16 +136,6 @@ func DecodeMessage(ptype byte, body []byte) (Packet, error) { dec = new(TalkRequest) case TalkResponseMsg: dec = new(TalkResponse) - case RequestTicketMsg: - dec = new(RequestTicket) - case TicketMsg: - dec = new(Ticket) - case RegtopicMsg: - dec = new(Regtopic) - case RegconfirmationMsg: - dec = new(Regconfirmation) - case TopicQueryMsg: - dec = new(TopicQuery) default: return nil, fmt.Errorf("unknown packet type %d", ptype) } @@ -188,62 +153,77 @@ func (*Whoareyou) Kind() byte { return WhoareyouPacket } func (*Whoareyou) RequestID() []byte { return nil } func (*Whoareyou) SetRequestID([]byte) {} +func (*Whoareyou) AppendLogInfo(ctx []interface{}) []interface{} { + return ctx +} + func (*Unknown) Name() string { return "UNKNOWN/v5" } func (*Unknown) Kind() byte { return UnknownPacket } func (*Unknown) RequestID() []byte { return nil } func (*Unknown) SetRequestID([]byte) {} +func (*Unknown) AppendLogInfo(ctx []interface{}) []interface{} { + return ctx +} + func (*Ping) Name() string { return "PING/v5" } func (*Ping) Kind() byte { return PingMsg } func (p *Ping) RequestID() []byte { return p.ReqID } func (p *Ping) SetRequestID(id []byte) { p.ReqID = id } +func (p *Ping) AppendLogInfo(ctx []interface{}) []interface{} { + return append(ctx, "req", hexutil.Bytes(p.ReqID), "enrseq", p.ENRSeq) +} + func (*Pong) Name() string { return "PONG/v5" } func (*Pong) Kind() byte { return PongMsg } func (p *Pong) RequestID() []byte { return p.ReqID } func (p *Pong) SetRequestID(id []byte) { p.ReqID = id } -func (*Findnode) Name() string { return "FINDNODE/v5" } -func (*Findnode) Kind() byte { return FindnodeMsg } +func (p *Pong) AppendLogInfo(ctx []interface{}) []interface{} { + return append(ctx, "req", hexutil.Bytes(p.ReqID), "enrseq", p.ENRSeq) +} + +func (p *Findnode) Name() string { return "FINDNODE/v5" } +func (p *Findnode) Kind() byte { return FindnodeMsg } func (p *Findnode) RequestID() []byte { return p.ReqID } func (p *Findnode) SetRequestID(id []byte) { p.ReqID = id } +func (p *Findnode) AppendLogInfo(ctx []interface{}) []interface{} { + ctx = append(ctx, "req", hexutil.Bytes(p.ReqID)) + if p.OpID != 0 { + ctx = append(ctx, "opid", p.OpID) + } + return ctx +} + func (*Nodes) Name() string { return "NODES/v5" } func (*Nodes) Kind() byte { return NodesMsg } func (p *Nodes) RequestID() []byte { return p.ReqID } func (p *Nodes) SetRequestID(id []byte) { p.ReqID = id } +func (p *Nodes) AppendLogInfo(ctx []interface{}) []interface{} { + return append(ctx, + "req", hexutil.Bytes(p.ReqID), + "tot", p.RespCount, + "n", len(p.Nodes), + ) +} + func (*TalkRequest) Name() string { return "TALKREQ/v5" } func (*TalkRequest) Kind() byte { return TalkRequestMsg } func (p *TalkRequest) RequestID() []byte { return p.ReqID } func (p *TalkRequest) SetRequestID(id []byte) { p.ReqID = id } +func (p *TalkRequest) AppendLogInfo(ctx []interface{}) []interface{} { + return append(ctx, "proto", p.Protocol, "reqid", hexutil.Bytes(p.ReqID), "len", len(p.Message)) +} + func (*TalkResponse) Name() string { return "TALKRESP/v5" } func (*TalkResponse) Kind() byte { return TalkResponseMsg } func (p *TalkResponse) RequestID() []byte { return p.ReqID } func (p *TalkResponse) SetRequestID(id []byte) { p.ReqID = id } -func (*RequestTicket) Name() string { return "REQTICKET/v5" } -func (*RequestTicket) Kind() byte { return RequestTicketMsg } -func (p *RequestTicket) RequestID() []byte { return p.ReqID } -func (p *RequestTicket) SetRequestID(id []byte) { p.ReqID = id } - -func (*Regtopic) Name() string { return "REGTOPIC/v5" } -func (*Regtopic) Kind() byte { return RegtopicMsg } -func (p *Regtopic) RequestID() []byte { return p.ReqID } -func (p *Regtopic) SetRequestID(id []byte) { p.ReqID = id } - -func (*Ticket) Name() string { return "TICKET/v5" } -func (*Ticket) Kind() byte { return TicketMsg } -func (p *Ticket) RequestID() []byte { return p.ReqID } -func (p *Ticket) SetRequestID(id []byte) { p.ReqID = id } - -func (*Regconfirmation) Name() string { return "REGCONFIRMATION/v5" } -func (*Regconfirmation) Kind() byte { return RegconfirmationMsg } -func (p *Regconfirmation) RequestID() []byte { return p.ReqID } -func (p *Regconfirmation) SetRequestID(id []byte) { p.ReqID = id } - -func (*TopicQuery) Name() string { return "TOPICQUERY/v5" } -func (*TopicQuery) Kind() byte { return TopicQueryMsg } -func (p *TopicQuery) RequestID() []byte { return p.ReqID } -func (p *TopicQuery) SetRequestID(id []byte) { p.ReqID = id } +func (p *TalkResponse) AppendLogInfo(ctx []interface{}) []interface{} { + return append(ctx, "req", p.ReqID, "len", len(p.Message)) +}