diff --git a/cmd/devp2p/internal/v5test/discv5tests.go b/cmd/devp2p/internal/v5test/discv5tests.go index a7cd35276..56624a0ca 100644 --- a/cmd/devp2p/internal/v5test/discv5tests.go +++ b/cmd/devp2p/internal/v5test/discv5tests.go @@ -355,7 +355,7 @@ func (bn *bystander) loop() { wasAdded = true bn.notifyAdded() case *v5wire.Findnode: - bn.conn.write(bn.l, &v5wire.Nodes{ReqID: p.ReqID, Total: 1}, nil) + bn.conn.write(bn.l, &v5wire.Nodes{ReqID: p.ReqID, RespCount: 1}, nil) wasAdded = true bn.notifyAdded() case *v5wire.TalkRequest: diff --git a/cmd/devp2p/internal/v5test/framework.go b/cmd/devp2p/internal/v5test/framework.go index f31677e51..10856a50b 100644 --- a/cmd/devp2p/internal/v5test/framework.go +++ b/cmd/devp2p/internal/v5test/framework.go @@ -44,6 +44,8 @@ func (p *readError) Unwrap() error { return p.err } func (p *readError) RequestID() []byte { return nil } func (p *readError) SetRequestID([]byte) {} +func (p *readError) AppendLogInfo(ctx []interface{}) []interface{} { return ctx } + // readErrorf creates a readError with the given text. func readErrorf(format string, args ...interface{}) *readError { return &readError{fmt.Errorf(format, args...)} @@ -171,16 +173,16 @@ func (tc *conn) findnode(c net.PacketConn, dists []uint) ([]*enode.Node, error) // Check total count. It should be greater than one // and needs to be the same across all responses. if first { - if resp.Total == 0 || resp.Total > 6 { - return nil, fmt.Errorf("invalid NODES response 'total' %d (not in (0,7))", resp.Total) + if resp.RespCount == 0 || resp.RespCount > 6 { + return nil, fmt.Errorf("invalid NODES response count %d (not in (0,7))", resp.RespCount) } - total = resp.Total + total = resp.RespCount n = int(total) - 1 first = false } else { n-- - if resp.Total != total { - return nil, fmt.Errorf("invalid NODES response 'total' %d (!= %d)", resp.Total, total) + if resp.RespCount != total { + return nil, fmt.Errorf("invalid NODES response count %d (!= %d)", resp.RespCount, total) } } // Check nodes. diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 57d624498..53a1c6f76 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -70,6 +70,9 @@ type UDPv5 struct { clock mclock.Clock validSchemes enr.IdentityScheme + // misc buffers used during message handling + logcontext []interface{} + // talkreq handler registry trlock sync.Mutex trhandlers map[string]TalkRequestHandler @@ -158,6 +161,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { activeCallByNode: make(map[enode.ID]*callV5), activeCallByAuth: make(map[v5wire.Nonce]*callV5), callQueue: make(map[enode.ID][]*callV5), + // shutdown closeCtx: closeCtx, cancelCloseCtx: cancelCloseCtx, @@ -385,7 +389,7 @@ func (t *UDPv5) waitForNodes(c *callV5, distances []uint) ([]*enode.Node, error) nodes = append(nodes, node) } if total == -1 { - total = min(int(response.Total), totalNodesResponseLimit) + total = min(int(response.RespCount), totalNodesResponseLimit) } if received++; received == total { return nodes, nil @@ -601,13 +605,18 @@ func (t *UDPv5) sendResponse(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.P // send sends a packet to the given node. func (t *UDPv5) send(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.Packet, c *v5wire.Whoareyou) (v5wire.Nonce, error) { addr := toAddr.String() + t.logcontext = append(t.logcontext[:0], "id", toID, "addr", addr) + t.logcontext = packet.AppendLogInfo(t.logcontext) + enc, nonce, err := t.codec.Encode(toID, addr, packet, c) if err != nil { - t.log.Warn(">> "+packet.Name(), "id", toID, "addr", addr, "err", err) + t.logcontext = append(t.logcontext, "err", err) + t.log.Warn(">> "+packet.Name(), t.logcontext...) return nonce, err } + _, err = t.conn.WriteToUDP(enc, toAddr) - t.log.Trace(">> "+packet.Name(), "id", toID, "addr", addr) + t.log.Trace(">> "+packet.Name(), t.logcontext...) return nonce, err } @@ -657,7 +666,9 @@ func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr *net.UDPAddr) error { } if packet.Kind() != v5wire.WhoareyouPacket { // WHOAREYOU logged separately to report errors. - t.log.Trace("<< "+packet.Name(), "id", fromID, "addr", addr) + t.logcontext = append(t.logcontext[:0], "id", fromID, "addr", addr) + t.logcontext = packet.AppendLogInfo(t.logcontext) + t.log.Trace("<< "+packet.Name(), t.logcontext...) } t.handle(packet, fromID, fromAddr) return nil @@ -712,7 +723,7 @@ func (t *UDPv5) handle(p v5wire.Packet, fromID enode.ID, fromAddr *net.UDPAddr) case *v5wire.Nodes: t.handleCallResponse(fromID, fromAddr, p) case *v5wire.TalkRequest: - t.handleTalkRequest(p, fromID, fromAddr) + t.handleTalkRequest(fromID, fromAddr, p) case *v5wire.TalkResponse: t.handleCallResponse(fromID, fromAddr, p) } @@ -827,7 +838,7 @@ func (t *UDPv5) collectTableNodes(rip net.IP, distances []uint, limit int) []*en // packNodes creates NODES response packets for the given node list. func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes { if len(nodes) == 0 { - return []*v5wire.Nodes{{ReqID: reqid, Total: 1}} + return []*v5wire.Nodes{{ReqID: reqid, RespCount: 1}} } // This limit represents the available space for nodes in output packets. Maximum @@ -851,13 +862,13 @@ func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes { resp = append(resp, p) } for _, msg := range resp { - msg.Total = uint8(len(resp)) + msg.RespCount = uint8(len(resp)) } return resp } // handleTalkRequest runs the talk request handler of the requested protocol. -func (t *UDPv5) handleTalkRequest(p *v5wire.TalkRequest, fromID enode.ID, fromAddr *net.UDPAddr) { +func (t *UDPv5) handleTalkRequest(fromID enode.ID, fromAddr *net.UDPAddr, p *v5wire.TalkRequest) { t.trlock.Lock() handler := t.trhandlers[p.Protocol] t.trlock.Unlock() diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index ab0cb9a82..481bb1cdc 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -208,8 +208,8 @@ func (test *udpV5Test) expectNodes(wantReqID []byte, wantTotal uint8, wantNodes if !bytes.Equal(p.ReqID, wantReqID) { test.t.Fatalf("wrong request ID %v in response, want %v", p.ReqID, wantReqID) } - if p.Total != wantTotal { - test.t.Fatalf("wrong total response count %d, want %d", p.Total, wantTotal) + if p.RespCount != wantTotal { + test.t.Fatalf("wrong total response count %d, want %d", p.RespCount, wantTotal) } for _, record := range p.Nodes { n, _ := enode.New(enode.ValidSchemesForTesting, record) @@ -301,14 +301,14 @@ func TestUDPv5_findnodeCall(t *testing.T) { t.Fatalf("wrong distances in request: %v", p.Distances) } test.packetIn(&v5wire.Nodes{ - ReqID: p.ReqID, - Total: 2, - Nodes: nodesToRecords(nodes[:4]), + ReqID: p.ReqID, + RespCount: 2, + Nodes: nodesToRecords(nodes[:4]), }) test.packetIn(&v5wire.Nodes{ - ReqID: p.ReqID, - Total: 2, - Nodes: nodesToRecords(nodes[4:]), + ReqID: p.ReqID, + RespCount: 2, + Nodes: nodesToRecords(nodes[4:]), }) }) @@ -409,16 +409,16 @@ func TestUDPv5_callTimeoutReset(t *testing.T) { test.waitPacketOut(func(p *v5wire.Findnode, addr *net.UDPAddr, _ v5wire.Nonce) { time.Sleep(respTimeout - 50*time.Millisecond) test.packetIn(&v5wire.Nodes{ - ReqID: p.ReqID, - Total: 2, - Nodes: nodesToRecords(nodes[:4]), + ReqID: p.ReqID, + RespCount: 2, + Nodes: nodesToRecords(nodes[:4]), }) time.Sleep(respTimeout - 50*time.Millisecond) test.packetIn(&v5wire.Nodes{ - ReqID: p.ReqID, - Total: 2, - Nodes: nodesToRecords(nodes[4:]), + ReqID: p.ReqID, + RespCount: 2, + Nodes: nodesToRecords(nodes[4:]), }) }) if err := <-done; err != nil { diff --git a/p2p/discover/v5wire/encoding_test.go b/p2p/discover/v5wire/encoding_test.go index 25df73283..a5387311a 100644 --- a/p2p/discover/v5wire/encoding_test.go +++ b/p2p/discover/v5wire/encoding_test.go @@ -92,7 +92,7 @@ func TestHandshake(t *testing.T) { } // A <- B NODES - nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1}) + nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1}) net.nodeA.expectDecode(t, NodesMsg, nodes) } @@ -150,7 +150,7 @@ func TestHandshake_norecord(t *testing.T) { net.nodeB.expectDecode(t, FindnodeMsg, findnode) // A <- B NODES - nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1}) + nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1}) net.nodeA.expectDecode(t, NodesMsg, nodes) } @@ -190,7 +190,7 @@ func TestHandshake_rekey(t *testing.T) { net.nodeB.expectDecode(t, FindnodeMsg, findnode) // A <- B NODES - nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1}) + nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1}) net.nodeA.expectDecode(t, NodesMsg, nodes) } @@ -225,7 +225,7 @@ func TestHandshake_rekey2(t *testing.T) { net.nodeB.expectDecode(t, FindnodeMsg, findnode) // A <- B NODES - nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1}) + nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1}) net.nodeA.expectDecode(t, NodesMsg, nodes) } diff --git a/p2p/discover/v5wire/msg.go b/p2p/discover/v5wire/msg.go index 1316598a4..fb8e1e12c 100644 --- a/p2p/discover/v5wire/msg.go +++ b/p2p/discover/v5wire/msg.go @@ -20,6 +20,7 @@ import ( "fmt" "net" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" @@ -32,6 +33,10 @@ type Packet interface { Kind() byte // Kind returns the message type. RequestID() []byte // Returns the request ID. SetRequestID([]byte) // Sets the request ID. + + // AppendLogInfo returns its argument 'ctx' with additional fields + // appended for logging purposes. + AppendLogInfo(ctx []interface{}) []interface{} } // Message types. @@ -44,9 +49,6 @@ const ( TalkResponseMsg RequestTicketMsg TicketMsg - RegtopicMsg - RegconfirmationMsg - TopicQueryMsg UnknownPacket = byte(255) // any non-decryptable packet WhoareyouPacket = byte(254) // the WHOAREYOU packet @@ -59,7 +61,7 @@ type ( Nonce Nonce } - // Whoareyou contains the handshake challenge. + // WHOAREYOU contains the handshake challenge. Whoareyou struct { ChallengeData []byte // Encoded challenge Nonce Nonce // Nonce of request packet @@ -73,13 +75,13 @@ type ( sent mclock.AbsTime // for handshake GC. } - // Ping is sent during liveness checks. + // PING is sent during liveness checks. Ping struct { ReqID []byte ENRSeq uint64 } - // Pong is the reply to Ping. + // PONG is the reply to PING. Pong struct { ReqID []byte ENRSeq uint64 @@ -87,62 +89,35 @@ type ( ToPort uint16 // packet, which provides a way to discover the external address (after NAT). } - // Findnode is a query for nodes in the given bucket. + // FINDNODE is a query for nodes in the given bucket. Findnode struct { ReqID []byte Distances []uint + + // OpID is for debugging purposes and is not part of the packet encoding. + // It identifies the 'operation' on behalf of which the request was sent. + OpID uint64 `rlp:"-"` } - // Nodes is the reply to Findnode and Topicquery. + // NODES is a response to FINDNODE. Nodes struct { - ReqID []byte - Total uint8 - Nodes []*enr.Record + ReqID []byte + RespCount uint8 // total number of responses to the request + Nodes []*enr.Record } - // TalkRequest is an application-level request. + // TALKREQ is an application-level request. TalkRequest struct { ReqID []byte Protocol string Message []byte } - // TalkResponse is the reply to TalkRequest. + // TALKRESP is the reply to TALKREQ. TalkResponse struct { ReqID []byte Message []byte } - - // RequestTicket requests a ticket for a topic queue. - RequestTicket struct { - ReqID []byte - Topic []byte - } - - // Ticket is the response to RequestTicket. - Ticket struct { - ReqID []byte - Ticket []byte - } - - // Regtopic registers the sender in a topic queue using a ticket. - Regtopic struct { - ReqID []byte - Ticket []byte - ENR *enr.Record - } - - // Regconfirmation is the reply to Regtopic. - Regconfirmation struct { - ReqID []byte - Registered bool - } - - // TopicQuery asks for nodes with the given topic. - TopicQuery struct { - ReqID []byte - Topic []byte - } ) // DecodeMessage decodes the message body of a packet. @@ -161,16 +136,6 @@ func DecodeMessage(ptype byte, body []byte) (Packet, error) { dec = new(TalkRequest) case TalkResponseMsg: dec = new(TalkResponse) - case RequestTicketMsg: - dec = new(RequestTicket) - case TicketMsg: - dec = new(Ticket) - case RegtopicMsg: - dec = new(Regtopic) - case RegconfirmationMsg: - dec = new(Regconfirmation) - case TopicQueryMsg: - dec = new(TopicQuery) default: return nil, fmt.Errorf("unknown packet type %d", ptype) } @@ -188,62 +153,77 @@ func (*Whoareyou) Kind() byte { return WhoareyouPacket } func (*Whoareyou) RequestID() []byte { return nil } func (*Whoareyou) SetRequestID([]byte) {} +func (*Whoareyou) AppendLogInfo(ctx []interface{}) []interface{} { + return ctx +} + func (*Unknown) Name() string { return "UNKNOWN/v5" } func (*Unknown) Kind() byte { return UnknownPacket } func (*Unknown) RequestID() []byte { return nil } func (*Unknown) SetRequestID([]byte) {} +func (*Unknown) AppendLogInfo(ctx []interface{}) []interface{} { + return ctx +} + func (*Ping) Name() string { return "PING/v5" } func (*Ping) Kind() byte { return PingMsg } func (p *Ping) RequestID() []byte { return p.ReqID } func (p *Ping) SetRequestID(id []byte) { p.ReqID = id } +func (p *Ping) AppendLogInfo(ctx []interface{}) []interface{} { + return append(ctx, "req", hexutil.Bytes(p.ReqID), "enrseq", p.ENRSeq) +} + func (*Pong) Name() string { return "PONG/v5" } func (*Pong) Kind() byte { return PongMsg } func (p *Pong) RequestID() []byte { return p.ReqID } func (p *Pong) SetRequestID(id []byte) { p.ReqID = id } -func (*Findnode) Name() string { return "FINDNODE/v5" } -func (*Findnode) Kind() byte { return FindnodeMsg } +func (p *Pong) AppendLogInfo(ctx []interface{}) []interface{} { + return append(ctx, "req", hexutil.Bytes(p.ReqID), "enrseq", p.ENRSeq) +} + +func (p *Findnode) Name() string { return "FINDNODE/v5" } +func (p *Findnode) Kind() byte { return FindnodeMsg } func (p *Findnode) RequestID() []byte { return p.ReqID } func (p *Findnode) SetRequestID(id []byte) { p.ReqID = id } +func (p *Findnode) AppendLogInfo(ctx []interface{}) []interface{} { + ctx = append(ctx, "req", hexutil.Bytes(p.ReqID)) + if p.OpID != 0 { + ctx = append(ctx, "opid", p.OpID) + } + return ctx +} + func (*Nodes) Name() string { return "NODES/v5" } func (*Nodes) Kind() byte { return NodesMsg } func (p *Nodes) RequestID() []byte { return p.ReqID } func (p *Nodes) SetRequestID(id []byte) { p.ReqID = id } +func (p *Nodes) AppendLogInfo(ctx []interface{}) []interface{} { + return append(ctx, + "req", hexutil.Bytes(p.ReqID), + "tot", p.RespCount, + "n", len(p.Nodes), + ) +} + func (*TalkRequest) Name() string { return "TALKREQ/v5" } func (*TalkRequest) Kind() byte { return TalkRequestMsg } func (p *TalkRequest) RequestID() []byte { return p.ReqID } func (p *TalkRequest) SetRequestID(id []byte) { p.ReqID = id } +func (p *TalkRequest) AppendLogInfo(ctx []interface{}) []interface{} { + return append(ctx, "proto", p.Protocol, "reqid", hexutil.Bytes(p.ReqID), "len", len(p.Message)) +} + func (*TalkResponse) Name() string { return "TALKRESP/v5" } func (*TalkResponse) Kind() byte { return TalkResponseMsg } func (p *TalkResponse) RequestID() []byte { return p.ReqID } func (p *TalkResponse) SetRequestID(id []byte) { p.ReqID = id } -func (*RequestTicket) Name() string { return "REQTICKET/v5" } -func (*RequestTicket) Kind() byte { return RequestTicketMsg } -func (p *RequestTicket) RequestID() []byte { return p.ReqID } -func (p *RequestTicket) SetRequestID(id []byte) { p.ReqID = id } - -func (*Regtopic) Name() string { return "REGTOPIC/v5" } -func (*Regtopic) Kind() byte { return RegtopicMsg } -func (p *Regtopic) RequestID() []byte { return p.ReqID } -func (p *Regtopic) SetRequestID(id []byte) { p.ReqID = id } - -func (*Ticket) Name() string { return "TICKET/v5" } -func (*Ticket) Kind() byte { return TicketMsg } -func (p *Ticket) RequestID() []byte { return p.ReqID } -func (p *Ticket) SetRequestID(id []byte) { p.ReqID = id } - -func (*Regconfirmation) Name() string { return "REGCONFIRMATION/v5" } -func (*Regconfirmation) Kind() byte { return RegconfirmationMsg } -func (p *Regconfirmation) RequestID() []byte { return p.ReqID } -func (p *Regconfirmation) SetRequestID(id []byte) { p.ReqID = id } - -func (*TopicQuery) Name() string { return "TOPICQUERY/v5" } -func (*TopicQuery) Kind() byte { return TopicQueryMsg } -func (p *TopicQuery) RequestID() []byte { return p.ReqID } -func (p *TopicQuery) SetRequestID(id []byte) { p.ReqID = id } +func (p *TalkResponse) AppendLogInfo(ctx []interface{}) []interface{} { + return append(ctx, "req", p.ReqID, "len", len(p.Message)) +}